[FLINK-6013][metrics] Add Datadog HTTP metrics reporter

This closes #3736.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54ceec16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54ceec16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54ceec16

Branch: refs/heads/master
Commit: 54ceec16c11655da4181c0816a3b12d1c4bab465
Parents: 50baec6
Author: Bowen Li <[email protected]>
Authored: Tue Apr 18 10:27:17 2017 -0700
Committer: zentol <[email protected]>
Committed: Tue May 9 22:56:49 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  24 +++
 flink-dist/pom.xml                              |   7 +
 flink-dist/src/main/assemblies/opt.xml          |   7 +
 flink-metrics/flink-metrics-datadog/pom.xml     | 108 ++++++++++
 .../apache/flink/metrics/datadog/DCounter.java  |  44 ++++
 .../apache/flink/metrics/datadog/DGauge.java    |  45 ++++
 .../apache/flink/metrics/datadog/DMeter.java    |  42 ++++
 .../apache/flink/metrics/datadog/DMetric.java   |  84 ++++++++
 .../apache/flink/metrics/datadog/DSeries.java   |  45 ++++
 .../metrics/datadog/DatadogHttpClient.java      |  97 +++++++++
 .../metrics/datadog/DatadogHttpReporter.java    | 210 +++++++++++++++++++
 .../flink/metrics/datadog/MetricType.java       |  30 +++
 .../metrics/datadog/DatadogHttpClientTest.java  | 199 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 +++
 flink-metrics/pom.xml                           |   1 +
 15 files changed, 970 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 290a452..2bc65a6 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -436,6 +436,30 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Note any variables in Flink metrics, such as `<host>`, `<job_name>`, 
`<tm_id>`, `<subtask_index>`, `<task_name>`, and `<operator_name>`,
+will be sent to Datadog as tags. Tags will look like `host:localhost` and 
`job_name:myjobname`.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when 
sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+{% endhighlight %}
+
 ## System metrics
 
 By default Flink gathers several metrics that provide deep insights on the 
current state.

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 9773991..6d8debf 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -202,6 +202,13 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-datadog</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
                <!-- end optional Flink metrics reporters -->
 
                <!-- start optional Flink libraries -->

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml 
b/flink-dist/src/main/assemblies/opt.xml
index 95218d7..0386b92 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -105,6 +105,13 @@
                </file>
 
                <file>
+                       
<source>../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}-shaded.jar</source>
+                       <outputDirectory>opt/</outputDirectory>
+                       
<destName>flink-metrics-datadog-${project.version}.jar</destName>
+                       <fileMode>0644</fileMode>
+               </file>
+
+               <file>
                        
<source>../flink-shaded-hadoop/flink-shaded-hadoop2/target/flink-shaded-hadoop2-${project.version}.jar</source>
                        <outputDirectory>opt/</outputDirectory>
                        
<destName>flink-shaded-hadoop2-${project.version}.jar</destName>

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/pom.xml 
b/flink-metrics/flink-metrics-datadog/pom.xml
new file mode 100644
index 0000000..0d473fc
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metrics</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-datadog</artifactId>
+       <name>flink-metrics-datadog</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-databind</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.squareup.okhttp3</groupId>
+                       <artifactId>okhttp</artifactId>
+                       <version>3.7.0</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.squareup.okio</groupId>
+                       <artifactId>okio</artifactId>
+                       <version>1.12.0</version>
+               </dependency>
+
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadedArtifactAttached>true</shadedArtifactAttached>
+                                                       <relocations 
combine.children="append">
+                                                               <relocation>
+                                                                       
<pattern>okhttp3</pattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.okhttp3</shadedPattern>
+                                                               </relocation>
+                                                               <relocation>
+                                                                       
<pattern>okio</pattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.okio</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
new file mode 100644
index 0000000..58abbd6
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.List;
+
+/**
+ * Mapping of counter between Flink and Datadog
+ * */
+public class DCounter extends DMetric {
+       private final Counter counter;
+
+       public DCounter(Counter c, String metricName, String host, List<String> 
tags) {
+               super(MetricType.counter, metricName, host, tags);
+               counter = c;
+       }
+
+       /**
+        * Visibility of this method must not be changed
+        * since we deliberately not map it to json object in a Datadog-defined 
format
+        * */
+       @Override
+       public Number getMetricValue() {
+               return counter.getCount();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
new file mode 100644
index 0000000..8deb117
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+
+import org.apache.flink.metrics.Gauge;
+
+import java.util.List;
+
+/**
+ * Mapping of gauge between Flink and Datadog
+ * */
+public class DGauge extends DMetric {
+       private final Gauge<Number> gauge;
+
+       public DGauge(Gauge<Number> g, String metricName, String host, 
List<String> tags) {
+               super(MetricType.gauge, metricName, host, tags);
+               gauge = g;
+       }
+
+       /**
+        * Visibility of this method must not be changed
+        * since we deliberately not map it to json object in a Datadog-defined 
format
+        * */
+       @Override
+       public Number getMetricValue() {
+               return gauge.getValue();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
new file mode 100644
index 0000000..181a00c
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Meter;
+
+import java.util.List;
+
+/**
+ * Mapping of meter between Flink and Datadog
+ *
+ * Only consider rate of the meter, due to Datadog HTTP API's limited support 
of meter
+ * */
+public class DMeter extends DMetric {
+       private final Meter meter;
+
+       public DMeter(Meter m, String metricName, String host, List<String> 
tags) {
+               super(MetricType.gauge, metricName, host, tags);
+               meter = m;
+       }
+
+       @Override
+       public Number getMetricValue() {
+               return meter.getRate();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
new file mode 100644
index 0000000..3f9d6ff
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class DMetric {
+       private static final long MILLIS_TO_SEC = 1000L;
+
+       /**
+        * Names of metric/type/tags field and their getters must not be changed
+        * since they are mapped to json objects in a Datadog-defined format
+        * */
+       private final String metric; // Metric name
+       private final MetricType type;
+       private final String host;
+       private final List<String> tags;
+
+       public DMetric(MetricType metricType, String metric, String host, 
List<String> tags) {
+               this.type = metricType;
+               this.metric = metric;
+               this.host = host;
+               this.tags = tags;
+       }
+
+       public MetricType getType() {
+               return type;
+       }
+
+       public String getMetric() {
+               return metric;
+       }
+
+       public String getHost() {
+               return host;
+       }
+
+       public List<String> getTags() {
+               return tags;
+       }
+
+       public List<List<Number>> getPoints() {
+               // One single data point
+               List<Number> point = new ArrayList<>();
+               point.add(getUnixEpochTimestamp());
+               point.add(getMetricValue());
+
+               List<List<Number>> points = new ArrayList<>();
+               points.add(point);
+
+               return points;
+       }
+
+       @JsonIgnore
+       public abstract Number getMetricValue();
+
+       public static long getUnixEpochTimestamp() {
+               return (System.currentTimeMillis() / MILLIS_TO_SEC);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
new file mode 100644
index 0000000..fb0bb09
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Json serialization between Flink and Datadog
+ **/
+public class DSeries {
+       /**
+        * Names of series field and its getters must not be changed
+        * since they are mapped to json objects in a Datadog-defined format
+        * */
+       private List<DMetric> series;
+
+       public DSeries() {
+               series = new ArrayList<>();
+       }
+
+       public void addMetric(DMetric metric) {
+               series.add(metric);
+       }
+
+       public List<DMetric> getSeries() {
+               return series;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
new file mode 100644
index 0000000..dfbcee1
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+       private static final String SERIES_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/series?api_key=%s";;
+       private static final String VALIDATE_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/validate?api_key=%s";;
+       private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json; charset=utf-8");
+       private static final int TIMEOUT = 3;
+       private static final ObjectMapper MAPPER = new ObjectMapper();
+
+       private final String seriesUrl;
+       private final String validateUrl;
+       private final OkHttpClient client;
+       private final String apiKey;
+
+       public DatadogHttpClient(String dgApiKey) {
+               if (dgApiKey == null || dgApiKey.isEmpty()) {
+                       throw new IllegalArgumentException("Invalid API key:" + 
dgApiKey);
+               }
+
+               apiKey = dgApiKey;
+               client = new OkHttpClient.Builder()
+                       .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+                       .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+                       .readTimeout(TIMEOUT, TimeUnit.SECONDS)
+                       .build();
+
+               seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
+               validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey);
+               validateApiKey();
+       }
+
+       private void validateApiKey() {
+               Request r = new 
Request.Builder().url(validateUrl).get().build();
+
+               try {
+                       Response response = client.newCall(r).execute();
+                       if (!response.isSuccessful()) {
+                               throw new IllegalArgumentException(
+                                       String.format("API key: %s is invalid", 
apiKey));
+                       }
+               } catch(IOException e) {
+                       throw new IllegalStateException("Failed contacting 
Datadog to validate API key", e);
+               }
+       }
+
+       public void send(DatadogHttpReporter.DatadogHttpRequest request) throws 
Exception {
+               String postBody = serialize(request.getSeries());
+
+               Request r = new Request.Builder()
+                       .url(seriesUrl)
+                       .post(RequestBody.create(MEDIA_TYPE, postBody))
+                       .build();
+
+               client.newCall(r).execute().close();
+       }
+
+       public static String serialize(Object obj) throws 
JsonProcessingException {
+               return MAPPER.writeValueAsString(obj);
+       }
+
+       public void close() {
+               client.dispatcher().executorService().shutdown();
+               client.connectionPool().evictAll();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
new file mode 100644
index 0000000..fcb5c4b
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+       private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+       private static final String HOST_VARIABLE = "<host>";
+
+       // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+       private final Map<Gauge, DGauge> gauges = new ConcurrentHashMap<>();
+       private final Map<Counter, DCounter> counters = new 
ConcurrentHashMap<>();
+       private final Map<Meter, DMeter> meters = new ConcurrentHashMap<>();
+
+       private DatadogHttpClient client;
+       private List<String> configTags;
+
+       public static final String API_KEY = "apikey";
+       public static final String TAGS = "tags";
+
+       @Override
+       public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+               final String name = group.getMetricIdentifier(metricName);
+
+               List<String> tags = new ArrayList<>(configTags);
+               tags.addAll(getTagsFromMetricGroup(group));
+               String host = getHostFromMetricGroup(group);
+
+               if (metric instanceof Counter) {
+                       Counter c = (Counter) metric;
+                       counters.put(c, new DCounter(c, name, host, tags));
+               } else if (metric instanceof Gauge) {
+                       Gauge g = (Gauge) metric;
+                       gauges.put(g, new DGauge(g, name, host, tags));
+               } else if (metric instanceof Meter) {
+                       Meter m = (Meter) metric;
+                       // Only consider rate
+                       meters.put(m, new DMeter(m, name, host, tags));
+               } else if (metric instanceof Histogram) {
+                       LOGGER.warn("Cannot add {} because Datadog HTTP API 
doesn't support Histogram", metricName);
+               } else {
+                       LOGGER.warn("Cannot add unknown metric type {}. This 
indicates that the reporter " +
+                               "does not support this metric type.", 
metric.getClass().getName());
+               }
+       }
+
+       @Override
+       public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+               if (metric instanceof Counter) {
+                       counters.remove(metric);
+               } else if (metric instanceof Gauge) {
+                       gauges.remove(metric);
+               } else if (metric instanceof Meter) {
+                       meters.remove(metric);
+               } else if (metric instanceof Histogram) {
+                       // No Histogram is registered
+               } else {
+                       LOGGER.warn("Cannot remove unknown metric type {}. This 
indicates that the reporter " +
+                               "does not support this metric type.", 
metric.getClass().getName());
+               }
+       }
+
+       @Override
+       public void open(MetricConfig config) {
+               client = new DatadogHttpClient(config.getString(API_KEY, null));
+               LOGGER.info("Configured DatadogHttpReporter");
+
+               configTags = getTagsFromConfig(config.getString(TAGS, ""));
+       }
+
+       @Override
+       public void close() {
+               client.close();
+               LOGGER.info("Shut down DatadogHttpReporter");
+       }
+
+       @Override
+       public void report() {
+               DatadogHttpRequest request = new DatadogHttpRequest();
+
+               for (Map.Entry<Gauge, DGauge> entry : gauges.entrySet()) {
+                       DGauge g = entry.getValue();
+                       try {
+                               // Will throw exception if the Gauge is not of 
Number type
+                               // Flink uses Gauge to store many types other 
than Number
+                               g.getMetricValue();
+                               request.addGauge(g);
+                       } catch (Exception e) {
+                               // Remove that Gauge if it's not of Number type
+                               gauges.remove(entry.getKey());
+                       }
+               }
+
+               for (DCounter c : counters.values()) {
+                       request.addCounter(c);
+               }
+
+               for (DMeter m : meters.values()) {
+                       request.addMeter(m);
+               }
+
+               try {
+                       client.send(request);
+               } catch (Exception e) {
+                       LOGGER.warn("Failed reporting metrics to Datadog.", e);
+               }
+       }
+
+       /**
+        * Get config tags from config 'metrics.reporter.dghttp.tags'
+        * */
+       private List<String> getTagsFromConfig(String str) {
+               return Arrays.asList(str.split(","));
+       }
+
+       /**
+        * Get tags from MetricGroup#getAllVariables(), excluding 'host'
+        * */
+       private List<String> getTagsFromMetricGroup(MetricGroup metricGroup) {
+               List<String> tags = new ArrayList<>();
+
+               for (Map.Entry<String, String> entry: 
metricGroup.getAllVariables().entrySet()) {
+                       if(!entry.getKey().equals(HOST_VARIABLE)) {
+                               tags.add(getVariableName(entry.getKey()) + ":" 
+ entry.getValue());
+                       }
+               }
+
+               return tags;
+       }
+
+       /**
+        * Get host from MetricGroup#getAllVariables() if it exists; returns 
Null otherwise
+        * */
+       private String getHostFromMetricGroup(MetricGroup metricGroup) {
+               return metricGroup.getAllVariables().get(HOST_VARIABLE);
+       }
+
+       /**
+        * Given "<xxx>", return "xxx"
+        * */
+       private String getVariableName(String str) {
+               return str.substring(1, str.length() - 1);
+       }
+
+       /**
+        * Compact metrics in batch, serialize them, and send to Datadog via 
HTTP
+        * */
+       static class DatadogHttpRequest {
+               private final DSeries series;
+
+               public DatadogHttpRequest() {
+                       series = new DSeries();
+               }
+
+               public void addGauge(DGauge gauge) {
+                       series.addMetric(gauge);
+               }
+
+               public void addCounter(DCounter counter) {
+                       series.addMetric(counter);
+               }
+
+               public void addMeter(DMeter meter) {
+                       series.addMetric(meter);
+               }
+
+               public DSeries getSeries() {
+                       return series;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
new file mode 100644
index 0000000..97f9b29
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+/**
+ * Metric types supported by Datadog
+ * */
+public enum MetricType {
+       /**
+        * Names of 'gauge' and 'counter' must not be changed
+        * since they are mapped to json objects in a Datadog-defined format
+        * */
+       gauge, counter
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
 
b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
new file mode 100644
index 0000000..bda5d47
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Enclosed.class)
+public class DatadogHttpClientTest {
+       public static class TestApiKey {
+               @Test(expected = IllegalArgumentException.class)
+               public void testClientWithEmptyKey() {
+                       new DatadogHttpClient("");
+               }
+
+               @Test(expected = IllegalArgumentException.class)
+               public void testClientWithNullKey() {
+                       new DatadogHttpClient(null);
+               }
+       }
+
+       @RunWith(PowerMockRunner.class)
+       @PrepareForTest(DMetric.class)
+       public static class TestSerialization {
+               private static List<String> tags = Arrays.asList("tag1", 
"tag2");
+
+               private static final long MOCKED_SYSTEM_MILLIS = 123L;
+
+               @Before
+               public void mockSystemMillis() {
+                       PowerMockito.mockStatic(DMetric.class);
+                       
PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
+               }
+
+               @Test
+               public void serializeGauge() throws JsonProcessingException {
+
+                       DGauge g = new DGauge(new Gauge<Number>() {
+                               @Override
+                               public Number getValue() {
+                                       return 1;
+                               }
+                       }, "testCounter", "localhost", tags);
+
+                       assertEquals(
+                               
"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+                               DatadogHttpClient.serialize(g));
+               }
+
+               @Test
+               public void serializeGaugeWithoutHost() throws 
JsonProcessingException {
+
+                       DGauge g = new DGauge(new Gauge<Number>() {
+                               @Override
+                               public Number getValue() {
+                                       return 1;
+                               }
+                       }, "testCounter", null, tags);
+
+                       assertEquals(
+                               
"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+                               DatadogHttpClient.serialize(g));
+               }
+
+               @Test
+               public void serializeCounter() throws JsonProcessingException {
+                       DCounter c = new DCounter(new Counter() {
+                               @Override
+                               public void inc() {}
+
+                               @Override
+                               public void inc(long n) {}
+
+                               @Override
+                               public void dec() {}
+
+                               @Override
+                               public void dec(long n) {}
+
+                               @Override
+                               public long getCount() {
+                                       return 1;
+                               }
+                       }, "testCounter", "localhost", tags);
+
+                       assertEquals(
+                               
"{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+                               DatadogHttpClient.serialize(c));
+               }
+
+               @Test
+               public void serializeCounterWithoutHost() throws 
JsonProcessingException {
+                       DCounter c = new DCounter(new Counter() {
+                               @Override
+                               public void inc() {}
+
+                               @Override
+                               public void inc(long n) {}
+
+                               @Override
+                               public void dec() {}
+
+                               @Override
+                               public void dec(long n) {}
+
+                               @Override
+                               public long getCount() {
+                                       return 1;
+                               }
+                       }, "testCounter", null, tags);
+
+                       assertEquals(
+                               
"{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+                               DatadogHttpClient.serialize(c));
+               }
+
+               @Test
+               public void serializeMeter() throws JsonProcessingException {
+
+                       DMeter m = new DMeter(new Meter() {
+                               @Override
+                               public void markEvent() {}
+
+                               @Override
+                               public void markEvent(long n) {}
+
+                               @Override
+                               public double getRate() {
+                                       return 1;
+                               }
+
+                               @Override
+                               public long getCount() {
+                                       return 0;
+                               }
+                       }, "testMeter","localhost", tags);
+
+                       assertEquals(
+                               
"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+                               DatadogHttpClient.serialize(m));
+               }
+
+               @Test
+               public void serializeMeterWithoutHost() throws 
JsonProcessingException {
+
+                       DMeter m = new DMeter(new Meter() {
+                               @Override
+                               public void markEvent() {}
+
+                               @Override
+                               public void markEvent(long n) {}
+
+                               @Override
+                               public double getRate() {
+                                       return 1;
+                               }
+
+                               @Override
+                               public long getCount() {
+                                       return 0;
+                               }
+                       }, "testMeter", null, tags);
+
+                       assertEquals(
+                               
"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+                               DatadogHttpClient.serialize(m));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties 
b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 317dde8..e1b66c2 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -40,6 +40,7 @@ under the License.
                <module>flink-metrics-graphite</module>
                <module>flink-metrics-jmx</module>
                <module>flink-metrics-statsd</module>
+               <module>flink-metrics-datadog</module>
        </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up

Reply via email to