http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java deleted file mode 100644 index 23a7768..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyReporter.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.util; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.reporter.MetricReporter; - -import java.util.List; - -public class DummyReporter implements MetricReporter { - @Override - public void open(Configuration config) { - } - - @Override - public void close() { - } - - @Override - public void notifyOfAddedMetric(Metric metric, String name) { - } - - @Override - public void notifyOfRemovedMetric(Metric metric, String name) { - } - - @Override - public String generateName(String name, List<String> scope) { - return ""; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java deleted file mode 100644 index 8b7714f..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskManagerMetricGroup.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.util; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.TaskManagerMetricGroup; - -public class DummyTaskManagerMetricGroup extends TaskManagerMetricGroup { - - public DummyTaskManagerMetricGroup() { - super(new DummyMetricRegistry(), "host", "id"); - } - - public DummyJobMetricGroup addJob(JobID id, String name) { - return new DummyJobMetricGroup(); - } - - @Override - protected void addMetric(String name, Metric metric) {} - - @Override - public MetricGroup addGroup(String name) { - return new DummyMetricGroup(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java deleted file mode 100644 index db2c557..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyTaskMetricGroup.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.util; - -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.TaskMetricGroup; -import org.apache.flink.util.AbstractID; - -public class DummyTaskMetricGroup extends TaskMetricGroup { - - public DummyTaskMetricGroup() { - super(new DummyMetricRegistry(), new DummyJobMetricGroup(), new AbstractID(), new AbstractID(), 0, "task"); - } - - public DummyOperatorMetricGroup addOperator(String name) { - return new DummyOperatorMetricGroup(); - } - - @Override - protected void addMetric(String name, Metric metric) {} - - @Override - public MetricGroup addGroup(String name) { - return new DummyMetricGroup(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java index 5d8a8e0..0c139f0 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/util/TestReporter.java @@ -19,10 +19,10 @@ package org.apache.flink.metrics.util; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.reporter.AbstractReporter; -import java.util.List; - public class TestReporter extends AbstractReporter { @Override @@ -32,7 +32,8 @@ public class TestReporter extends AbstractReporter { public void close() {} @Override - public String generateName(String name, List<String> scope) { - return name; - } + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {} + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {} } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 31b3ba2..2682584 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -31,8 +31,9 @@ import org.apache.flink.api.java.tuple.builder.Tuple2Builder; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; @@ -77,7 +78,8 @@ public class CoGroupOperatorCollectionTest implements Serializable { final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>(); final HashMap<String, Future<Path>> cpTasks = new HashMap<>(); final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0); - final RuntimeContext ctx = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulators, new DummyMetricGroup()); + final RuntimeContext ctx = new RuntimeUDFContext( + taskInfo, null, executionConfig, cpTasks, accumulators, new UnregisteredMetricsGroup()); { SumCoGroup udf1 = new SumCoGroup(); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index 1b627c4..c5a247a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; import org.junit.Test; @@ -169,9 +169,20 @@ public class GroupReduceOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); + executionConfig.enableObjectReuse(); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index 1d5668b..89574a8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -18,8 +18,6 @@ package org.apache.flink.api.common.operators.base; -import static org.junit.Assert.*; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; @@ -30,8 +28,9 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; + import org.junit.Test; import java.io.Serializable; @@ -43,6 +42,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Future; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + @SuppressWarnings({ "unchecked", "serial" }) public class InnerJoinOperatorBaseTest implements Serializable { @@ -107,10 +109,22 @@ public class InnerJoinOperatorBaseTest implements Serializable { try { final TaskInfo taskInfo = new TaskInfo("op", 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); - List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); + executionConfig.enableObjectReuse(); - List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); assertEquals(expected, new HashSet<>(resultSafe)); assertEquals(expected, new HashSet<>(resultRegular)); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 4317c03..150854d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -30,7 +30,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + import org.junit.Test; import java.util.ArrayList; @@ -42,7 +43,9 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings({"serial", "unchecked"}) public class ReduceOperatorTest implements java.io.Serializable { @@ -145,10 +148,22 @@ public class ReduceOperatorTest implements java.io.Serializable { final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); - List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); + executionConfig.enableObjectReuse(); - List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe); Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java index 059704d..74fdb85 100644 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java +++ b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java @@ -15,10 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.dropwizard; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; import com.codahale.metrics.ScheduledReporter; + import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.metrics.CounterWrapper; @@ -26,19 +29,19 @@ import org.apache.flink.dropwizard.metrics.GaugeWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; -import java.util.List; +import java.util.HashMap; +import java.util.Map; /** * Base class for {@link org.apache.flink.metrics.reporter.MetricReporter} that wraps a * Dropwizard {@link com.codahale.metrics.Reporter}. */ @PublicEvolving -public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled { - protected MetricRegistry registry; - protected ScheduledReporter reporter; +public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled, Reporter { public static final String ARG_HOST = "host"; public static final String ARG_PORT = "port"; @@ -46,25 +49,24 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch public static final String ARG_CONVERSION_RATE = "rateConversion"; public static final String ARG_CONVERSION_DURATION = "durationConversion"; - protected ScheduledDropwizardReporter() { - this.registry = new MetricRegistry(); - } + // ------------------------------------------------------------------------ - @Override - public synchronized void notifyOfAddedMetric(Metric metric, String name) { - if (metric instanceof Counter) { - registry.register(name, new CounterWrapper((Counter) metric)); - } else if (metric instanceof Gauge) { - registry.register(name, new GaugeWrapper((Gauge<?>) metric)); - } - } + protected final MetricRegistry registry; - @Override - public synchronized void notifyOfRemovedMetric(Metric metric, String name) { - registry.remove(name); + protected ScheduledReporter reporter; + + private final Map<Gauge<?>, String> gauges = new HashMap<>(); + private final Map<Counter, String> counters = new HashMap<>(); + + // ------------------------------------------------------------------------ + + protected ScheduledDropwizardReporter() { + this.registry = new MetricRegistry(); } - public abstract ScheduledReporter getReporter(Configuration config); + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ @Override public void open(Configuration config) { @@ -76,24 +78,60 @@ public abstract class ScheduledDropwizardReporter implements MetricReporter, Sch this.reporter.stop(); } + // ------------------------------------------------------------------------ + // adding / removing metrics + // ------------------------------------------------------------------------ + @Override - public String generateName(String name, List<String> scope) { - StringBuilder sb = new StringBuilder(); - for (String s : scope) { - sb.append(s); - sb.append('.'); + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + final String fullName = group.getScopeString() + '.' + metricName; + + synchronized (this) { + if (metric instanceof Counter) { + counters.put((Counter) metric, fullName); + registry.register(fullName, new CounterWrapper((Counter) metric)); + } + else if (metric instanceof Gauge) { + gauges.put((Gauge<?>) metric, fullName); + registry.register(fullName, GaugeWrapper.fromGauge((Gauge<?>) metric)); + } } - sb.append(name); - return sb.toString(); } @Override - public synchronized void report() { - this.reporter.report( - this.registry.getGauges(), - this.registry.getCounters(), - this.registry.getHistograms(), - this.registry.getMeters(), - this.registry.getTimers()); + public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + synchronized (this) { + String fullName; + + if (metric instanceof Counter) { + fullName = counters.remove(metric); + } else if (metric instanceof Gauge) { + fullName = gauges.remove(metric); + } else { + fullName = null; + } + + if (fullName != null) { + registry.remove(fullName); + } + } } + + // ------------------------------------------------------------------------ + // scheduled reporting + // ------------------------------------------------------------------------ + + @Override + public void report() { + synchronized (this) { + this.reporter.report( + this.registry.getGauges(), + this.registry.getCounters(), + this.registry.getHistograms(), + this.registry.getMeters(), + this.registry.getTimers()); + } + } + + public abstract ScheduledReporter getReporter(Configuration config); } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java index fcb629a..655cd60 100644 --- a/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java +++ b/flink-metric-reporters/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/GaugeWrapper.java @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.dropwizard.metrics; import org.apache.flink.metrics.Gauge; public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> { + private final Gauge<T> gauge; public GaugeWrapper(Gauge<T> gauge) { @@ -30,4 +32,10 @@ public class GaugeWrapper<T> implements com.codahale.metrics.Gauge<T> { public T getValue() { return this.gauge.getValue(); } + + public static <T> GaugeWrapper<T> fromGauge(Gauge<?> gauge) { + @SuppressWarnings("unchecked") + Gauge<T> typedGauge = (Gauge<T>) gauge; + return new GaugeWrapper<>(typedGauge); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java new file mode 100644 index 0000000..adf9394 --- /dev/null +++ b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/ganglia/GangliaReporter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.ganglia; + +import com.codahale.metrics.ScheduledReporter; + +import info.ganglia.gmetric4j.gmetric.GMetric; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.dropwizard.ScheduledDropwizardReporter; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +@PublicEvolving +public class GangliaReporter extends ScheduledDropwizardReporter { + + public static final String ARG_DMAX = "dmax"; + public static final String ARG_TMAX = "tmax"; + public static final String ARG_TTL = "ttl"; + public static final String ARG_MODE_ADDRESSING = "addressingMode"; + + @Override + public ScheduledReporter getReporter(Configuration config) { + + try { + String host = config.getString(ARG_HOST, null); + int port = config.getInteger(ARG_PORT, -1); + if (host == null || host.length() == 0 || port < 1) { + throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); + } + String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST"); + int ttl = config.getInteger(ARG_TTL, -1); + GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl); + + String prefix = config.getString(ARG_PREFIX, null); + String conversionRate = config.getString(ARG_CONVERSION_RATE, null); + String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); + int dMax = config.getInteger(ARG_DMAX, 0); + int tMax = config.getInteger(ARG_TMAX, 60); + + com.codahale.metrics.ganglia.GangliaReporter.Builder builder = + com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry); + + if (prefix != null) { + builder.prefixedWith(prefix); + } + if (conversionRate != null) { + builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); + } + if (conversionDuration != null) { + builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); + } + builder.withDMax(dMax); + builder.withTMax(tMax); + + return builder.build(gMetric); + } catch (IOException e) { + throw new RuntimeException("Error while instantiating GangliaReporter.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java b/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java deleted file mode 100644 index a1dafc9..0000000 --- a/flink-metric-reporters/flink-metrics-ganglia/src/main/java/org/apache/flink/metrics/graphite/GangliaReporter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.graphite; - -import com.codahale.metrics.ScheduledReporter; -import info.ganglia.gmetric4j.gmetric.GMetric; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.dropwizard.ScheduledDropwizardReporter; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public class GangliaReporter extends ScheduledDropwizardReporter { - public static final String ARG_DMAX = "dmax"; - public static final String ARG_TMAX = "tmax"; - public static final String ARG_TTL = "ttl"; - public static final String ARG_MODE_ADDRESSING = "addressingMode"; - - @Override - public ScheduledReporter getReporter(Configuration config) { - - try { - String host = config.getString(ARG_HOST, null); - int port = config.getInteger(ARG_PORT, -1); - if (host == null || host.length() == 0 || port < 1) { - throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); - } - String addressingMode = config.getString(ARG_MODE_ADDRESSING, "MULTICAST"); - int ttl = config.getInteger(ARG_TTL, -1); - GMetric gMetric = new GMetric(host, port, GMetric.UDPAddressingMode.valueOf(addressingMode), ttl); - - String prefix = config.getString(ARG_PREFIX, null); - String conversionRate = config.getString(ARG_CONVERSION_RATE, null); - String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); - int dMax = config.getInteger(ARG_DMAX, 0); - int tMax = config.getInteger(ARG_TMAX, 60); - - com.codahale.metrics.ganglia.GangliaReporter.Builder builder = - com.codahale.metrics.ganglia.GangliaReporter.forRegistry(registry); - - if (prefix != null) { - builder.prefixedWith(prefix); - } - if (conversionRate != null) { - builder.convertRatesTo(TimeUnit.valueOf(conversionRate)); - } - if (conversionDuration != null) { - builder.convertDurationsTo(TimeUnit.valueOf(conversionDuration)); - } - builder.withDMax(dMax); - builder.withTMax(tMax); - - return builder.build(gMetric); - } catch (IOException e) { - throw new RuntimeException("Error while instantiating GangliaReporter.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java index b28d7a4..16be830 100644 --- a/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java +++ b/flink-metric-reporters/flink-metrics-graphite/src/main/java/org/apache/flink/metrics/graphite/GraphiteReporter.java @@ -15,17 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.graphite; import com.codahale.metrics.ScheduledReporter; import com.codahale.metrics.graphite.Graphite; + +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.dropwizard.ScheduledDropwizardReporter; -import java.util.List; import java.util.concurrent.TimeUnit; +@PublicEvolving public class GraphiteReporter extends ScheduledDropwizardReporter { + @Override public ScheduledReporter getReporter(Configuration config) { String host = config.getString(ARG_HOST, null); @@ -56,15 +60,4 @@ public class GraphiteReporter extends ScheduledDropwizardReporter { return builder.build(new Graphite(host, port)); } - - @Override - public String generateName(String name, List<String> scope) { - StringBuilder sb = new StringBuilder(); - for (String s : scope) { - sb.append(s.replace(".", "_").replace("\"", "")); - sb.append("."); - } - sb.append(name); - return sb.toString(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ---------------------------------------------------------------------- diff --git a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index e57001f..124b21d 100644 --- a/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metric-reporters/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -15,13 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.statsd; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.reporter.AbstractReporter; import org.apache.flink.metrics.reporter.Scheduled; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,9 +33,7 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketException; -import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; /** * Largely based on the StatsDReporter class by ReadyTalk @@ -40,20 +41,19 @@ import java.util.concurrent.TimeUnit; * * Ported since it was not present in maven central. */ +@PublicEvolving public class StatsDReporter extends AbstractReporter implements Scheduled { + private static final Logger LOG = LoggerFactory.getLogger(StatsDReporter.class); public static final String ARG_HOST = "host"; public static final String ARG_PORT = "port"; - public static final String ARG_CONVERSION_RATE = "rateConversion"; - public static final String ARG_CONVERSION_DURATION = "durationConversion"; +// public static final String ARG_CONVERSION_RATE = "rateConversion"; +// public static final String ARG_CONVERSION_DURATION = "durationConversion"; private DatagramSocket socket; private InetSocketAddress address; - private double durationFactor; - private double rateFactor; - @Override public void open(Configuration config) { String host = config.getString(ARG_HOST, null); @@ -63,16 +63,17 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); } - String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS"); - String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS"); - this.address = new InetSocketAddress(host, port); - this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1); - this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1); + +// String conversionRate = config.getString(ARG_CONVERSION_RATE, "SECONDS"); +// String conversionDuration = config.getString(ARG_CONVERSION_DURATION, "MILLISECONDS"); +// this.rateFactor = TimeUnit.valueOf(conversionRate).toSeconds(1); +// this.durationFactor = 1.0 / TimeUnit.valueOf(conversionDuration).toNanos(1); + try { this.socket = new DatagramSocket(0); } catch (SocketException e) { - throw new RuntimeException("Failure while creating socket. ", e); + throw new RuntimeException("Could not create datagram socket. ", e); } } @@ -83,50 +84,40 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { } } - @Override - public String generateName(String name, List<String> scope) { - StringBuilder sb = new StringBuilder(); - for (String s : scope) { - sb.append(s); - sb.append('.'); - } - sb.append(name); - return sb.toString(); - } - - public void send(final String name, final double value) { - send(name, "" + value); - } - - public void send(final String name, final String value) { - try { - String formatted = String.format("%s:%s|g", name, value); - byte[] data = formatted.getBytes(); - socket.send(new DatagramPacket(data, data.length, this.address)); - } catch (IOException e) { - LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); - } - } + // ------------------------------------------------------------------------ @Override public void report() { - for (Map.Entry<String, Gauge<?>> entry : gauges.entrySet()) { - reportGauge(entry.getKey(), entry.getValue()); + for (Map.Entry<Gauge<?>, String> entry : gauges.entrySet()) { + reportGauge(entry.getValue(), entry.getKey()); } - for (Map.Entry<String, Counter> entry : counters.entrySet()) { - reportCounter(entry.getKey(), entry.getValue()); + for (Map.Entry<Counter, String> entry : counters.entrySet()) { + reportCounter(entry.getValue(), entry.getKey()); } } + // ------------------------------------------------------------------------ + private void reportCounter(final String name, final Counter counter) { - send(name, counter.getCount()); + send(name, String.valueOf(counter.getCount())); } private void reportGauge(final String name, final Gauge<?> gauge) { - final String value = gauge.getValue().toString(); + Object value = gauge.getValue(); if (value != null) { - send((name), value); + send(name, value.toString()); + } + } + + private void send(final String name, final String value) { + try { + String formatted = String.format("%s:%s|g", name, value); + byte[] data = formatted.getBytes(); + socket.send(new DatagramPacket(data, data.length, this.address)); + } + catch (IOException e) { + LOG.error("unable to send packet to statsd at '{}:{}'", address.getHostName(), address.getPort()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index d711c47..ecf8d1a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1092,7 +1092,8 @@ class TaskManager( val taskMetricGroup = taskManagerMetricGroup .addTaskForJob( tdd.getJobID, jobName, - tdd.getVertexID, tdd.getExecutionId, tdd.getIndexInSubtaskGroup, tdd.getTaskName) + tdd.getVertexID, tdd.getExecutionId, tdd.getTaskName, + tdd.getIndexInSubtaskGroup, tdd.getAttemptNumber) val task = new Task( tdd, http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 15ad353..89cde95 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; @@ -227,6 +227,6 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> { @Override public MetricGroup getMetricGroup() { - return new DummyMetricGroup(); + return new UnregisteredMetricsGroup(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 6e9b817..766123f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -27,14 +27,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.Driver; -import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.ResettableDriver; +import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; @@ -371,7 +371,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog @Override public MetricGroup getMetricGroup() { - return new DummyMetricGroup(); + return new UnregisteredMetricsGroup(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index eb2a3a7..3a69fab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -18,40 +18,41 @@ package org.apache.flink.runtime.operators.testutils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; - import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.util.DummyMetricGroup; -import org.apache.flink.runtime.operators.Driver; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.runtime.testutils.recordutils.RecordComparator; -import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.TaskContext; +import org.apache.flink.runtime.operators.Driver; import org.apache.flink.runtime.operators.ResettableDriver; +import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.testutils.recordutils.RecordComparator; +import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TestLogger; + import org.junit.After; +import org.junit.Assert; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + @RunWith(Parameterized.class) public class DriverTestBase<S extends Function> extends TestLogger implements TaskContext<S, Record> { @@ -368,7 +369,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta @Override public MetricGroup getMetricGroup() { - return new DummyMetricGroup(); + return new UnregisteredMetricsGroup(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 9b54383..78fb422 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -18,17 +18,12 @@ package org.apache.flink.runtime.operators.testutils; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Future; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.groups.TaskMetricGroup; -import org.apache.flink.metrics.util.DummyTaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; @@ -42,12 +37,17 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Future; + public class DummyEnvironment implements Environment { - private final TaskInfo taskInfo; private final JobID jobId = new JobID(); private final JobVertexID jobVertexId = new JobVertexID(); + private final ExecutionAttemptID executionId = new ExecutionAttemptID(); private final ExecutionConfig executionConfig = new ExecutionConfig(); + private final TaskInfo taskInfo; public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) { this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0); @@ -70,12 +70,12 @@ public class DummyEnvironment implements Environment { @Override public ExecutionAttemptID getExecutionId() { - return null; + return executionId; } @Override public Configuration getTaskConfiguration() { - return null; + return new Configuration(); } @Override @@ -85,12 +85,12 @@ public class DummyEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new DummyTaskMetricGroup(); + return new UnregisteredTaskMetricsGroup(); } @Override public Configuration getJobConfiguration() { - return null; + return new Configuration(); } @Override @@ -115,7 +115,7 @@ public class DummyEnvironment implements Environment { @Override public ClassLoader getUserClassLoader() { - return null; + return getClass().getClassLoader(); } @Override @@ -134,12 +134,10 @@ public class DummyEnvironment implements Environment { } @Override - public void acknowledgeCheckpoint(long checkpointId) { - } + public void acknowledgeCheckpoint(long checkpointId) {} @Override - public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) { - } + public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {} @Override public ResultPartitionWriter getWriter(int index) { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index b774b48..0220149 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.metrics.groups.TaskMetricGroup; -import org.apache.flink.metrics.util.DummyTaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; @@ -216,7 +215,7 @@ public class MockEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new DummyTaskMetricGroup(); + return new UnregisteredTaskMetricsGroup(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java new file mode 100644 index 0000000..a2edce2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.JobMetricGroup; +import org.apache.flink.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.UUID; + +public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { + + private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(new Configuration()); + + + public UnregisteredTaskMetricsGroup() { + super(EMPTY_REGISTRY, new DummyJobMetricGroup(), + new JobVertexID(), new ExecutionAttemptID(), "testtask", 0, 0); + } + + @Override + protected void addMetric(String name, Metric metric) {} + + @Override + public MetricGroup addGroup(String name) { + return new UnregisteredMetricsGroup(); + } + + // ------------------------------------------------------------------------ + + private static class DummyTaskManagerMetricsGroup extends TaskManagerMetricGroup { + + public DummyTaskManagerMetricsGroup() { + super(EMPTY_REGISTRY, "localhost", UUID.randomUUID().toString()); + } + } + + private static class DummyJobMetricGroup extends JobMetricGroup { + + public DummyJobMetricGroup() { + super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 812507f..eb087c6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -29,14 +29,14 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.util.DummyTaskMetricGroup; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -46,6 +46,7 @@ import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Test; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -61,8 +62,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"}) public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @@ -795,7 +803,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final Environment env = mock(Environment.class); when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader()); - when(env.getMetricGroup()).thenReturn(new DummyTaskMetricGroup()); + when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); when(task.getEnvironment()).thenReturn(env); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index c89ac50..af46513 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; @@ -34,12 +33,12 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.util.DummyTaskMetricGroup; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -47,6 +46,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.junit.After; import org.junit.Test; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -964,11 +964,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { when(task.getName()).thenReturn("Test task name"); when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); - final Environment env = mock(Environment.class); - when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 0, 1, 0)); - when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader()); - when(env.getMetricGroup()).thenReturn(new DummyTaskMetricGroup()); - + final Environment env = new DummyEnvironment("Test task name", 1, 0); when(task.getEnvironment()).thenReturn(env); try { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index b2d0196..f8c36de 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -26,7 +26,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.metrics.groups.TaskMetricGroup; -import org.apache.flink.metrics.util.DummyTaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.event.AbstractEvent; @@ -44,6 +43,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; @@ -308,7 +308,7 @@ public class StreamMockEnvironment implements Environment { @Override public TaskMetricGroup getMetricGroup() { - return new DummyTaskMetricGroup(); + return new UnregisteredTaskMetricsGroup(); } }
