Repository: flink Updated Branches: refs/heads/master 8ed368582 -> 707606ac4
http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java index 7b3286d..6b9a5fc 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java @@ -17,15 +17,112 @@ */ package org.apache.flink.metrics.groups; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.util.AbstractID; import org.junit.Test; import java.util.List; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class TaskManagerGroupTest { + + // ------------------------------------------------------------------------ + // adding and removing jobs + // ------------------------------------------------------------------------ + + @Test + public void addAndRemoveJobs() { + final TaskManagerMetricGroup group = new TaskManagerMetricGroup( + new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString()); + + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + final AbstractID vertex11 = new AbstractID(); + final AbstractID vertex12 = new AbstractID(); + final AbstractID vertex13 = new AbstractID(); + final AbstractID vertex21 = new AbstractID(); + + final AbstractID execution11 = new AbstractID(); + final AbstractID execution12 = new AbstractID(); + final AbstractID execution13 = new AbstractID(); + final AbstractID execution21 = new AbstractID(); + + TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, 17, "test"); + TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, 13, "test"); + TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, 7, "test"); + + assertEquals(2, group.numRegisteredJobMetricGroups()); + assertFalse(tmGroup11.parent().isClosed()); + assertFalse(tmGroup12.parent().isClosed()); + assertFalse(tmGroup21.parent().isClosed()); + + // close all for job 2 and one from job 1 + tmGroup11.close(); + tmGroup21.close(); + assertTrue(tmGroup11.isClosed()); + assertTrue(tmGroup21.isClosed()); + + // job 2 should be removed, job should still be there + assertFalse(tmGroup11.parent().isClosed()); + assertFalse(tmGroup12.parent().isClosed()); + assertTrue(tmGroup21.parent().isClosed()); + assertEquals(1, group.numRegisteredJobMetricGroups()); + + // add one more to job one + TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, vertex13, execution13, 0, "test"); + tmGroup12.close(); + tmGroup13.close(); + + assertTrue(tmGroup11.parent().isClosed()); + assertTrue(tmGroup12.parent().isClosed()); + assertTrue(tmGroup13.parent().isClosed()); + + assertEquals(0, group.numRegisteredJobMetricGroups()); + } + + @Test + public void testCloseClosesAll() { + final TaskManagerMetricGroup group = new TaskManagerMetricGroup( + new MetricRegistry(new Configuration()), "localhost", new AbstractID().toString()); + + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + final AbstractID vertex11 = new AbstractID(); + final AbstractID vertex12 = new AbstractID(); + final AbstractID vertex21 = new AbstractID(); + + final AbstractID execution11 = new AbstractID(); + final AbstractID execution12 = new AbstractID(); + final AbstractID execution21 = new AbstractID(); + + TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, 17, "test"); + TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, 13, "test"); + TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, 7, "test"); + + group.close(); + + assertTrue(tmGroup11.isClosed()); + assertTrue(tmGroup12.isClosed()); + assertTrue(tmGroup21.isClosed()); + } + + // ------------------------------------------------------------------------ + // scope tests + // ------------------------------------------------------------------------ + @Test public void testGenerateScopeDefault() { MetricRegistry registry = new MetricRegistry(new Configuration()); http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java index d607072..ab78288 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.groups.JobMetricGroup; import org.apache.flink.util.AbstractID; public class DummyJobMetricGroup extends JobMetricGroup { + public DummyJobMetricGroup() { super(new DummyMetricRegistry(), new DummyTaskManagerMetricGroup(), new JobID(), "job"); } @@ -34,14 +35,10 @@ public class DummyJobMetricGroup extends JobMetricGroup { } @Override - protected MetricGroup addMetric(String name, Metric metric) { - return this; - } + public void removeTaskMetricGroup(AbstractID executionId) {} @Override - public MetricGroup addGroup(int name) { - return addGroup("" + name); - } + protected void addMetric(String name, Metric metric) {} @Override public MetricGroup addGroup(String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java index 26df874..77ddd17 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java @@ -19,6 +19,7 @@ package org.apache.flink.metrics.util; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.groups.Scope; @@ -26,8 +27,13 @@ import java.util.ArrayList; import java.util.List; public class DummyMetricGroup extends AbstractMetricGroup { + public DummyMetricGroup() { - super(new DummyMetricRegistry()); + this(new DummyMetricRegistry()); + } + + public DummyMetricGroup(MetricRegistry registry) { + super(registry); } @Override @@ -41,14 +47,8 @@ public class DummyMetricGroup extends AbstractMetricGroup { } @Override - protected MetricGroup addMetric(String name, Metric metric) { - return this; - } - - @Override - public MetricGroup addGroup(int name) { - return addGroup("" + name); - } + protected void addMetric(String name, Metric metric) {} + @Override public MetricGroup addGroup(String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java index f0d6d3f..f8b73a9 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java @@ -15,18 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.util; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricRegistry; public class DummyMetricRegistry extends MetricRegistry { - private static final Configuration config; - - static { - config = new Configuration(); - config.setString(KEY_METRICS_REPORTER_CLASS, DummyReporter.class.getCanonicalName()); - } public DummyMetricRegistry() { super(new Configuration()); http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java index eb45f6a..e271d6e 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java @@ -22,19 +22,13 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorMetricGroup; public class DummyOperatorMetricGroup extends OperatorMetricGroup { + public DummyOperatorMetricGroup() { super(new DummyMetricRegistry(), new DummyTaskMetricGroup(), "operator", 0); } @Override - protected MetricGroup addMetric(String name, Metric metric) { - return this; - } - - @Override - public MetricGroup addGroup(int name) { - return addGroup("" + name); - } + protected void addMetric(String name, Metric metric) {} @Override public MetricGroup addGroup(String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java index 1c7d33b..8b7714f 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; public class DummyTaskManagerMetricGroup extends TaskManagerMetricGroup { + public DummyTaskManagerMetricGroup() { super(new DummyMetricRegistry(), "host", "id"); } @@ -32,14 +33,7 @@ public class DummyTaskManagerMetricGroup extends TaskManagerMetricGroup { } @Override - protected MetricGroup addMetric(String name, Metric metric) { - return this; - } - - @Override - public MetricGroup addGroup(int name) { - return addGroup("" + name); - } + protected void addMetric(String name, Metric metric) {} @Override public MetricGroup addGroup(String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java index 53683f4..db2c557 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java @@ -23,6 +23,7 @@ import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.util.AbstractID; public class DummyTaskMetricGroup extends TaskMetricGroup { + public DummyTaskMetricGroup() { super(new DummyMetricRegistry(), new DummyJobMetricGroup(), new AbstractID(), new AbstractID(), 0, "task"); } @@ -32,14 +33,7 @@ public class DummyTaskMetricGroup extends TaskMetricGroup { } @Override - protected MetricGroup addMetric(String name, Metric metric) { - return this; - } - - @Override - public MetricGroup addGroup(int name) { - return addGroup("" + name); - } + protected void addMetric(String name, Metric metric) {} @Override public MetricGroup addGroup(String name) { http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java index 482d1e8..5d8a8e0 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java @@ -15,23 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.util; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.reporter.AbstractReporter; -import org.apache.flink.metrics.reporter.MetricReporter; import java.util.List; public class TestReporter extends AbstractReporter { + @Override - public void open(Configuration config) { - } + public void open(Configuration config) {} @Override - public void close() { - } + public void close() {} @Override public String generateName(String name, List<String> scope) { http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/pom.xml b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml new file mode 100644 index 0000000..a386880 --- /dev/null +++ b/flink-metric-reporters/flink-metrics-dropwizard/pom.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metric-reporters</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-dropwizard</artifactId> + <name>flink-metrics-dropwizard</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java new file mode 100644 index 0000000..059704d --- /dev/null +++ b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.dropwizard; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.metrics.CounterWrapper; +import org.apache.flink.dropwizard.metrics.GaugeWrapper; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; + +import java.util.List; + +/** + * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a + * Dropwizard {@link com.codahale.metrics.Reporter}. + */ +@PublicEvolving +public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled { + protected MetricRegistry registry; + protected ScheduledReporter reporter; + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + public static final String ARG_PREFIX = "prefix"; + public static final String ARG_CONVERSION_RATE = "rateConversion"; + public static final String ARG_CONVERSION_DURATION = "durationConversion"; + + protected ScheduledDropwizardReporter() { + this.registry = new MetricRegistry(); + } + + @Override + public synchronized void notifyOfAddedMetric(Metric metric, String name) { + if (metric instanceof Counter) { + registry.register(name, new CounterWrapper((Counter) metric)); + } else if (metric instanceof Gauge) { + registry.register(name, new GaugeWrapper((Gauge<?>) metric)); + } + } + + @Override + public synchronized void notifyOfRemovedMetric(Metric metric, String name) { + registry.remove(name); + } + + public abstract ScheduledReporter getReporter(Configuration config); + + @Override + public void open(Configuration config) { + this.reporter = getReporter(config); + } + + @Override + public void close() { + this.reporter.stop(); + } + + @Override + public String generateName(String name, List<String> scope) { + StringBuilder sb = new StringBuilder(); + for (String s : scope) { + sb.append(s); + sb.append('.'); + } + sb.append(name); + return sb.toString(); + } + + @Override + public synchronized void report() { + this.reporter.report( + this.registry.getGauges(), + this.registry.getCounters(), + this.registry.getHistograms(), + this.registry.getMeters(), + this.registry.getTimers()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java new file mode 100644 index 0000000..f6630b9 --- /dev/null +++ b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Counter; + +public class CounterWrapper extends com.codahale.metrics.Counter { + private final Counter counter; + + public CounterWrapper(Counter counter) { + this.counter = counter; + } + + @Override + public long getCount() { + return this.counter.getCount(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java new file mode 100644 index 0000000..fcb629a --- /dev/null +++ b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.dropwizard.metrics; + +import org.apache.flink.metrics.Gauge; + +public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> { + private final Gauge<T> gauge; + + public GaugeWrapper(Gauge<T> gauge) { + this.gauge = gauge; + } + + @Override + public T getValue() { + return this.gauge.getValue(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-ganglia/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-ganglia/pom.xml b/flink-metric-reporters/flink-metrics-ganglia/pom.xml new file mode 100644 index 0000000..a457ca1 --- /dev/null +++ b/flink-metric-reporters/flink-metrics-ganglia/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metric-reporters</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-ganglia</artifactId> + <name>flink-metrics-ganglia</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-dropwizard</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>info.ganglia.gmetric4j</groupId> + <artifactId>gmetric4j</artifactId> + <version>1.0.7</version> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-ganglia</artifactId> + <version>${metrics.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java new file mode 100644 index 0000000..a1dafc9 --- /dev/null +++ b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.graphite; + +import com.codahale.metrics.ScheduledReporter; +import info.ganglia.gmetric4j.gmetric.GMetric; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.ScheduledDropwizardReporter; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class GangliaReporter extends ScheduledDropwizardReporter { + public static final String ARG_DMAX = "dmax"; + public static final String ARG_TMAX = "tmax"; + public static final String ARG_TTL = "ttl"; + public static final String ARG_MODE_ADDRESSING = "addressingMode"; + + @Override + public ScheduledReporter getReporter(Configuration config) { + + try { + String host = config.getString(ARG_HOST, null); + int port = config.getInteger(ARG_PORT, -1); + if (host == null || host.length() == 0 || port < 1) { + throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); + } + String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST"); + int ttl = config.getInteger(ARG_TTL, -1); + GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl); + + String prefix = config.getString(ARG_PREFIX, null); + String conversionRate = config.getString(ARG_CONVERSION_RATE, null); + String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); + int dMax = config.getInteger(ARG_DMAX, 0); + int tMax = config.getInteger(ARG_TMAX, 60); + + com.codahale.metrics.ganglia.GangliaReporter.Builder builder = + com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry); + + if (prefix != null) { + builder.prefixedWith(prefix); + } + if (conversionRate != null) { + builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); + } + if (conversionDuration != null) { + builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); + } + builder.withDMax(dMax); + builder.withTMax(tMax); + + return builder.build(gMetric); + } catch (IOException e) { + throw new RuntimeException("Error while instantiating GangliaReporter.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-graphite/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-graphite/pom.xml b/flink-metric-reporters/flink-metrics-graphite/pom.xml new file mode 100644 index 0000000..714b77f --- /dev/null +++ b/flink-metric-reporters/flink-metrics-graphite/pom.xml @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metric-reporters</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-graphite</artifactId> + <name>flink-metrics-graphite</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-dropwizard</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + <version>${metrics.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java new file mode 100644 index 0000000..b28d7a4 --- /dev/null +++ b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.graphite; + +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.graphite.Graphite; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.ScheduledDropwizardReporter; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class GraphiteReporter extends ScheduledDropwizardReporter { + @Override + public ScheduledReporter getReporter(Configuration config) { + String host = config.getString(ARG_HOST, null); + int port = config.getInteger(ARG_PORT, -1); + + if (host == null || host.length() == 0 || port < 1) { + throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); + } + + String prefix = config.getString(ARG_PREFIX, null); + String conversionRate = config.getString(ARG_CONVERSION_RATE, null); + String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); + + com.codahale.metrics.graphite.GraphiteReporter.Builder builder = + com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry); + + if (prefix != null) { + builder.prefixedWith(prefix); + } + + if (conversionRate != null) { + builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); + } + + if (conversionDuration != null) { + builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); + } + + return builder.build(new Graphite(host, port)); + } + + @Override + public String generateName(String name, List<String> scope) { + StringBuilder sb = new StringBuilder(); + for (String s : scope) { + sb.append(s.replace(".", "_").replace("\"", "")); + sb.append("."); + } + sb.append(name); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-statsd/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-statsd/pom.xml b/flink-metric-reporters/flink-metrics-statsd/pom.xml new file mode 100644 index 0000000..3052a10 --- /dev/null +++ b/flink-metric-reporters/flink-metrics-statsd/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metric-reporters</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metrics-statsd</artifactId> + <name>flink-metrics-statsd</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java new file mode 100644 index 0000000..e57001f --- /dev/null +++ b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.statsd; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Largely based on the StatsDReporter class by ReadyTalk + * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java + * + * Ported since it was not present in maven central. + */ +public class StatsDReporter extends AbstractReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class); + + public static final String ARG_HOST = "host"; + public static final String ARG_PORT = "port"; + public static final String ARG_CONVERSION_RATE = "rateConversion"; + public static final String ARG_CONVERSION_DURATION = "durationConversion"; + + private DatagramSocket socket; + private InetSocketAddress address; + + private double durationFactor; + private double rateFactor; + + @Override + public void open(Configuration config) { + String host = config.getString(ARG_HOST, null); + int port = config.getInteger(ARG_PORT, -1); + + if (host == null || host.length() == 0 || port < 1) { + throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); + } + + String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS"); + String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS"); + + this.address = new InetSocketAddress(host, port); + this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1); + this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1); + try { + this.socket = new DatagramSocket(0); + } catch (SocketException e) { + throw new RuntimeException("Failure while creating socket. ", e); + } + } + + @Override + public void close() { + if (socket != null && !socket.isClosed()) { + socket.close(); + } + } + + @Override + public String generateName(String name, List<String> scope) { + StringBuilder sb = new StringBuilder(); + for (String s : scope) { + sb.append(s); + sb.append('.'); + } + sb.append(name); + return sb.toString(); + } + + public void send(final String name, final double value) { + send(name, "" + value); + } + + public void send(final String name, final String value) { + try { + String formatted = String.format("%s:%s|g", name, value); + byte[] data = formatted.getBytes(); + socket.send(new DatagramPacket(data, data.length, this.address)); + } catch (IOException e) { + LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); + } + } + + @Override + public void report() { + for (Map.Entry<String, Gauge<?>> entry : gauges.entrySet()) { + reportGauge(entry.getKey(), entry.getValue()); + } + + for (Map.Entry<String, Counter> entry : counters.entrySet()) { + reportCounter(entry.getKey(), entry.getValue()); + } + } + + private void reportCounter(final String name, final Counter counter) { + send(name, counter.getCount()); + } + + private void reportGauge(final String name, final Gauge<?> gauge) { + final String value = gauge.getValue().toString(); + if (value != null) { + send((name), value); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metric-reporters/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/pom.xml b/flink-metric-reporters/pom.xml new file mode 100644 index 0000000..01a809c --- /dev/null +++ b/flink-metric-reporters/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-metric-reporters</artifactId> + <name>flink-metric-reporters</name> + <packaging>pom</packaging> + + <modules> + <module>flink-metrics-dropwizard</module> + <module>flink-metrics-ganglia</module> + <module>flink-metrics-graphite</module> + <module>flink-metrics-statsd</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/pom.xml b/flink-metrics/flink-metrics-dropwizard/pom.xml deleted file mode 100644 index 84d9722..0000000 --- a/flink-metrics/flink-metrics-dropwizard/pom.xml +++ /dev/null @@ -1,72 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-dropwizard</artifactId> - <name>flink-metrics-dropwizard</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java deleted file mode 100644 index a7309be..0000000 --- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.dropwizard; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.ScheduledReporter; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.metrics.CounterWrapper; -import org.apache.flink.dropwizard.metrics.GaugeWrapper; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.reporter.MetricReporter; -import org.apache.flink.metrics.reporter.Scheduled; - -import java.util.List; - -/** - * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a - * Dropwizard {@link com.codahale.metrics.Reporter}. - */ -@PublicEvolving -public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled { - protected MetricRegistry registry; - protected ScheduledReporter reporter; - - public static final String ARG_HOST = "host"; - public static final String ARG_PORT = "port"; - public static final String ARG_PREFIX = "prefix"; - public static final String ARG_CONVERSION_RATE = "rateConversion"; - public static final String ARG_CONVERSION_DURATION = "durationConversion"; - - protected ScheduledDropwizardReporter() { - this.registry = new MetricRegistry(); - } - - @Override - public synchronized void notifyOfAddedMetric(Metric metric, String name) { - if (metric instanceof Counter) { - registry.register(name, new CounterWrapper((Counter) metric)); - } else if (metric instanceof Gauge) { - registry.register(name, new GaugeWrapper((Gauge) metric)); - } - } - - @Override - public synchronized void notifyOfRemovedMetric(Metric metric, String name) { - registry.remove(name); - } - - public abstract ScheduledReporter getReporter(Configuration config); - - @Override - public void open(Configuration config) { - this.reporter = getReporter(config); - } - - @Override - public void close() { - this.reporter.stop(); - } - - @Override - public String generateName(String name, List<String> scope) { - StringBuilder sb = new StringBuilder(); - for (String s : scope) { - sb.append(s); - sb.append('.'); - } - sb.append(name); - return sb.toString(); - } - - @Override - public synchronized void report() { - this.reporter.report( - this.registry.getGauges(), - this.registry.getCounters(), - this.registry.getHistograms(), - this.registry.getMeters(), - this.registry.getTimers()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java deleted file mode 100644 index f6630b9..0000000 --- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/CounterWrapper.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.dropwizard.metrics; - -import org.apache.flink.metrics.Counter; - -public class CounterWrapper extends com.codahale.metrics.Counter { - private final Counter counter; - - public CounterWrapper(Counter counter) { - this.counter = counter; - } - - @Override - public long getCount() { - return this.counter.getCount(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java deleted file mode 100644 index d47090d..0000000 --- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.dropwizard.metrics; - -import org.apache.flink.metrics.Gauge; - -public class GaugeWrapper implements com.codahale.metrics.Gauge { - private final Gauge gauge; - - public GaugeWrapper(Gauge gauge) { - this.gauge = gauge; - } - - @Override - public Object getValue() { - return this.gauge.getValue(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-ganglia/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-ganglia/pom.xml b/flink-metrics/flink-metrics-ganglia/pom.xml deleted file mode 100644 index c4f51da..0000000 --- a/flink-metrics/flink-metrics-ganglia/pom.xml +++ /dev/null @@ -1,90 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-ganglia</artifactId> - <name>flink-metrics-ganglia</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-dropwizard</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>info.ganglia.gmetric4j</groupId> - <artifactId>gmetric4j</artifactId> - <version>1.0.7</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-ganglia</artifactId> - <version>${metrics.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java b/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java deleted file mode 100644 index a1dafc9..0000000 --- a/flink-metrics/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.graphite; - -import com.codahale.metrics.ScheduledReporter; -import info.ganglia.gmetric4j.gmetric.GMetric; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.ScheduledDropwizardReporter; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public class GangliaReporter extends ScheduledDropwizardReporter { - public static final String ARG_DMAX = "dmax"; - public static final String ARG_TMAX = "tmax"; - public static final String ARG_TTL = "ttl"; - public static final String ARG_MODE_ADDRESSING = "addressingMode"; - - @Override - public ScheduledReporter getReporter(Configuration config) { - - try { - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST"); - int ttl = config.getInteger(ARG_TTL, -1); - GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl); - - String prefix = config.getString(ARG_PREFIX, null); - String conversionRate = config.getString(ARG_CONVERSION_RATE, null); - String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); - int dMax = config.getInteger(ARG_DMAX, 0); - int tMax = config.getInteger(ARG_TMAX, 60); - - com.codahale.metrics.ganglia.GangliaReporter.Builder builder = - com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry); - - if (prefix != null) { - builder.prefixedWith(prefix); - } - if (conversionRate != null) { - builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); - } - if (conversionDuration != null) { - builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); - } - builder.withDMax(dMax); - builder.withTMax(tMax); - - return builder.build(gMetric); - } catch (IOException e) { - throw new RuntimeException("Error while instantiating GangliaReporter.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-graphite/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-graphite/pom.xml b/flink-metrics/flink-metrics-graphite/pom.xml deleted file mode 100644 index 45fb018..0000000 --- a/flink-metrics/flink-metrics-graphite/pom.xml +++ /dev/null @@ -1,84 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-graphite</artifactId> - <name>flink-metrics-graphite</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-dropwizard</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-graphite</artifactId> - <version>${metrics.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java deleted file mode 100644 index b28d7a4..0000000 --- a/flink-metrics/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.graphite; - -import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.graphite.Graphite; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.ScheduledDropwizardReporter; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class GraphiteReporter extends ScheduledDropwizardReporter { - @Override - public ScheduledReporter getReporter(Configuration config) { - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - - String prefix = config.getString(ARG_PREFIX, null); - String conversionRate = config.getString(ARG_CONVERSION_RATE, null); - String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); - - com.codahale.metrics.graphite.GraphiteReporter.Builder builder = - com.codahale.metrics.graphite.GraphiteReporter.forRegistry(registry); - - if (prefix != null) { - builder.prefixedWith(prefix); - } - - if (conversionRate != null) { - builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); - } - - if (conversionDuration != null) { - builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); - } - - return builder.build(new Graphite(host, port)); - } - - @Override - public String generateName(String name, List<String> scope) { - StringBuilder sb = new StringBuilder(); - for (String s : scope) { - sb.append(s.replace(".", "_").replace("\"", "")); - sb.append("."); - } - sb.append(name); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-statsd/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/pom.xml b/flink-metrics/flink-metrics-statsd/pom.xml deleted file mode 100644 index 5d63908..0000000 --- a/flink-metrics/flink-metrics-statsd/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics-statsd</artifactId> - <name>flink-metrics-statsd</name> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java deleted file mode 100644 index 288b8b8..0000000 --- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.statsd; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.reporter.AbstractReporter; -import org.apache.flink.metrics.reporter.Scheduled; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * Largely based on the StatsDReporter class by ReadyTalk - * https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics3-statsd/src/main/java/com/readytalk/metrics/StatsDReporter.java - * - * Ported since it was not present in maven central. - */ -public class StatsDReporter extends AbstractReporter implements Scheduled { - private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class); - - public static final String ARG_HOST = "host"; - public static final String ARG_PORT = "port"; - public static final String ARG_CONVERSION_RATE = "rateConversion"; - public static final String ARG_CONVERSION_DURATION = "durationConversion"; - - private DatagramSocket socket; - private InetSocketAddress address; - - private double durationFactor; - private double rateFactor; - - @Override - public void open(Configuration config) { - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - - String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS"); - String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS"); - - this.address = new InetSocketAddress(host, port); - this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1); - this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1); - try { - this.socket = new DatagramSocket(0); - } catch (SocketException e) { - throw new RuntimeException("Failure while creating socket. ", e); - } - } - - @Override - public void close() { - if (socket != null && !socket.isClosed()) { - socket.close(); - } - } - - @Override - public String generateName(String name, List<String> scope) { - StringBuilder sb = new StringBuilder(); - for (String s : scope) { - sb.append(s); - sb.append('.'); - } - sb.append(name); - return sb.toString(); - } - - public void send(final String name, final double value) { - send(name, "" + value); - } - - public void send(final String name, final String value) { - try { - String formatted = String.format("%s:%s|g", name, value); - byte[] data = formatted.getBytes(); - socket.send(new DatagramPacket(data, data.length, this.address)); - } catch (IOException e) { - LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); - } - } - - @Override - public void report() { - for (Map.Entry<String, Gauge> entry : gauges.entrySet()) { - reportGauge(entry.getKey(), entry.getValue()); - } - - for (Map.Entry<String, Counter> entry : counters.entrySet()) { - reportCounter(entry.getKey(), entry.getValue()); - } - } - - private void reportCounter(final String name, final Counter counter) { - send(name, counter.getCount()); - } - - private void reportGauge(final String name, final Gauge<?> gauge) { - final String value = gauge.getValue().toString(); - if (value != null) { - send((name), value); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml deleted file mode 100644 index 542f49c..0000000 --- a/flink-metrics/pom.xml +++ /dev/null @@ -1,42 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-parent</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-metrics</artifactId> - <name>flink-metrics</name> - <packaging>pom</packaging> - - <modules> - <module>flink-metrics-dropwizard</module> - <module>flink-metrics-ganglia</module> - <module>flink-metrics-graphite</module> - <module>flink-metrics-statsd</module> - </modules> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index bc3ba0d..1f766e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -60,6 +60,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateUtils; import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,15 +92,15 @@ import static com.google.common.base.Preconditions.checkNotNull; * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} have only data * readers, -writers, and certain event callbacks. The task connects those to the * network stack and actor messages, and tracks the state of the execution and - * handles exceptions.</p> + * handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and - * produce (if any).</p> + * produce (if any). * - * <p>Each Task is run by one dedicated thread.</p> + * <p>Each Task is run by one dedicated thread. */ public class Task implements Runnable { @@ -129,6 +130,7 @@ public class Task implements Runnable { /** TaskInfo object for this task */ private final TaskInfo taskInfo; + /** The name of the task, including subtask indexes */ private final String taskNameWithSubtask; /** The job-wide configuration object */ @@ -158,6 +160,9 @@ public class Task implements Runnable { /** The BroadcastVariableManager to be used by this task */ private final BroadcastVariableManager broadcastVariableManager; + /** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */ + private final SerializedValue<ExecutionConfig> serializedExecutionConfig; + private final ResultPartition[] producedPartitions; private final ResultPartitionWriter[] writers; @@ -193,6 +198,9 @@ public class Task implements Runnable { /** The thread that executes the task */ private final Thread executingThread; + /** Parent group for all metrics of this task */ + private final TaskMetricGroup metrics; + // ------------------------------------------------------------------------ // Fields that control the task execution. All these fields are volatile // (which means that they introduce memory barriers), to establish @@ -202,8 +210,6 @@ public class Task implements Runnable { /** atomic flag that makes sure the invokable is canceled exactly once upon error */ private final AtomicBoolean invokableHasBeenCanceled; - private final TaskMetricGroup metrics; - /** The invokable of this task, if initialized */ private volatile AbstractInvokable invokable; @@ -215,16 +221,13 @@ public class Task implements Runnable { /** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized */ private volatile ExecutorService asyncCallDispatcher; - + /** The handle to the state that the operator was initialized with. Will be set to null after the * initialization, to be memory friendly */ private volatile SerializedValue<StateHandle<?>> operatorState; private volatile long recoveryTs; - /** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */ - private final SerializedValue<ExecutionConfig> serializedExecutionConfig; - /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ private long taskCancellationInterval; @@ -688,9 +691,6 @@ public class Task implements Runnable { // remove all of the tasks library resources libraryCache.unregisterTask(jobId, executionId); - - //Uncomment before Merging!!! - //metrics.close(); // remove all files in the distributed cache removeCachedFiles(distributedCacheEntries, fileCache); @@ -703,6 +703,16 @@ public class Task implements Runnable { LOG.error(message, t); notifyFatalError(message, t); } + + // un-register the metrics at the end so that the task may already be + // counted as finished when this happens + // errors here will only be logged + try { + metrics.close(); + } + catch (Throwable t) { + LOG.error("Error during metrics de-registration", t); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index beb012c..a5cc18d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -155,8 +155,8 @@ class TaskManager( /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() - private var metricsRegistry : FlinkMetricRegistry = null - private var taskManagerMetricGroup : TaskManagerMetricGroup = null + private var metricsRegistry : FlinkMetricRegistry = _ + private var taskManagerMetricGroup : TaskManagerMetricGroup = _ /** Metric serialization */ private val metricRegistryMapper: ObjectMapper = new ObjectMapper() @@ -938,7 +938,7 @@ class TaskManager( libraryCacheManager = Some(new FallbackLibraryCacheManager) } - metricsRegistry = new FlinkMetricRegistry(this.config.configuration); + metricsRegistry = new FlinkMetricRegistry(config.configuration) taskManagerMetricGroup = new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString) @@ -1011,6 +1011,10 @@ class TaskManager( // disassociate the network environment network.disassociate() + + // stop the metrics reporters + metricsRegistry.shutdown() + metricsRegistry = null } protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = { @@ -1085,8 +1089,9 @@ class TaskManager( } val taskMetricGroup = taskManagerMetricGroup - .addJob(tdd.getJobID, jobName) - .addTask(tdd.getVertexID, tdd.getExecutionId, tdd.getIndexInSubtaskGroup, tdd.getTaskName) + .addTaskForJob( + tdd.getJobID, jobName, + tdd.getVertexID, tdd.getExecutionId, tdd.getIndexInSubtaskGroup, tdd.getTaskName) val task = new Task( tdd, @@ -1224,16 +1229,16 @@ class TaskManager( registry.getSnapshot } - self ! decorateMessage( - UpdateTaskExecutionState( - new TaskExecutionState( - task.getJobID, - task.getExecutionId, - task.getExecutionState, - task.getFailureCause, - accumulators) - ) + self ! decorateMessage( + UpdateTaskExecutionState( + new TaskExecutionState( + task.getJobID, + task.getExecutionId, + task.getExecutionState, + task.getFailureCause, + accumulators) ) + ) } else { log.error(s"Cannot find task with ID $executionID to unregister.") http://git-wip-us.apache.org/repos/asf/flink/blob/707606ac/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c97815a..784aa40 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,7 @@ under the License. <module>flink-quickstart</module> <module>flink-contrib</module> <module>flink-dist</module> - <module>flink-metrics</module> + <module>flink-metric-reporters</module> </modules> <properties>
