http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java new file mode 100644 index 0000000..1451637 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups.scope; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A container for component scope formats. + */ +@Internal +public class ScopeFormats { + + private final TaskManagerScopeFormat taskManagerFormat; + private final TaskManagerJobScopeFormat taskManagerJobFormat; + private final TaskScopeFormat taskFormat; + private final OperatorScopeFormat operatorFormat; + + // ------------------------------------------------------------------------ + + /** + * Creates all default scope formats. + */ + public ScopeFormats() { + this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT); + + this.taskManagerJobFormat = new TaskManagerJobScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat); + + this.taskFormat = new TaskScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat); + + this.operatorFormat = new OperatorScopeFormat( + ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat); + } + + /** + * Creates all scope formats, based on the given scope format strings. + */ + public ScopeFormats( + String taskManagerFormat, + String taskManagerJobFormat, + String taskFormat, + String operatorFormat) + { + this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat); + this.taskManagerJobFormat = new TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat); + this.taskFormat = new TaskScopeFormat(taskFormat, this.taskManagerJobFormat); + this.operatorFormat = new OperatorScopeFormat(operatorFormat, this.taskFormat); + } + + /** + * Creates a {@code ScopeFormats} with the given scope formats. + */ + public ScopeFormats( + TaskManagerScopeFormat taskManagerFormat, + TaskManagerJobScopeFormat taskManagerJobFormat, + TaskScopeFormat taskFormat, + OperatorScopeFormat operatorFormat) + { + this.taskManagerFormat = checkNotNull(taskManagerFormat); + this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat); + this.taskFormat = checkNotNull(taskFormat); + this.operatorFormat = checkNotNull(operatorFormat); + } + + // ------------------------------------------------------------------------ + + public TaskManagerScopeFormat getTaskManagerFormat() { + return this.taskManagerFormat; + } + + public TaskManagerJobScopeFormat getJobFormat() { + return this.taskManagerJobFormat; + } + + public TaskScopeFormat getTaskFormat() { + return this.taskFormat; + } + + public OperatorScopeFormat getOperatorFormat() { + return this.operatorFormat; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java index 271e91a..f2e78bf 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java @@ -18,33 +18,45 @@ package org.apache.flink.metrics.reporter; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.groups.AbstractMetricGroup; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +/** + * Base interface for custom metric reporters. + */ +@PublicEvolving public abstract class AbstractReporter implements MetricReporter { - - protected Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>(); - protected Map<String, Counter> counters = new ConcurrentHashMap<>(); + + protected final Map<Gauge<?>, String> gauges = new HashMap<>(); + protected final Map<Counter, String> counters = new HashMap<>(); @Override - public void notifyOfAddedMetric(Metric metric, String name) { - if (metric instanceof Counter) { - counters.put(name, (Counter) metric); - } else if (metric instanceof Gauge) { - gauges.put(name, (Gauge<?>) metric); + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + final String name = group.getScopeString() + '.' + metricName; + + synchronized (this) { + if (metric instanceof Counter) { + counters.put((Counter) metric, name); + } else if (metric instanceof Gauge) { + gauges.put((Gauge<?>) metric, name); + } } } @Override - public void notifyOfRemovedMetric(Metric metric, String name) { - if (metric instanceof Counter) { - counters.remove(name); - } else if (metric instanceof Gauge) { - gauges.remove(name); + public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java index 69fa11a..71c80de 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.reporter; import org.apache.flink.annotation.Internal; @@ -22,46 +23,76 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; + +import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.management.InstanceAlreadyExistsException; import javax.management.InstanceNotFoundException; -import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import java.lang.management.ManagementFactory; -import java.util.List; +import java.util.HashMap; +import java.util.Map; /** - * {@link org.apache.flink.metrics.reporter.MetricReporter} that exports {@link org.apache.flink.metrics.Metric}s via JMX. + * {@link MetricReporter} that exports {@link Metric Metrics} via JMX. * * Largely based on the JmxReporter class of the dropwizard metrics library * https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java */ @Internal public class JMXReporter implements MetricReporter { - private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class); - - private MBeanServer mBeanServer; private static final String PREFIX = "org.apache.flink.metrics:"; private static final String KEY_PREFIX = "key"; + private static final Logger LOG = LoggerFactory.getLogger(JMXReporter.class); + + // ------------------------------------------------------------------------ + + /** The server where the management beans are registered and deregistered */ + private final MBeanServer mBeanServer; + + /** The names under which the registered metrics have been added to the MBeanServer */ + private final Map<Metric, ObjectName> registeredMetrics; + + /** + * Creates a new JMXReporter + */ public JMXReporter() { this.mBeanServer = ManagementFactory.getPlatformMBeanServer(); + this.registeredMetrics = new HashMap<>(); } + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration config) {} + + @Override + public void close() {} + + // ------------------------------------------------------------------------ + // adding / removing metrics + // ------------------------------------------------------------------------ + @Override - public void notifyOfAddedMetric(Metric metric, String name) { + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + final String name = generateJmxName(metricName, group.getScopeComponents()); + AbstractBean jmxMetric; ObjectName jmxName; try { jmxName = new ObjectName(name); } catch (MalformedObjectNameException e) { - throw new IllegalArgumentException("Metric name did not conform to JMX ObjectName rules: " + name, e); + LOG.error("Metric name did not conform to JMX ObjectName rules: " + name, e); + return; } if (metric instanceof Gauge) { @@ -69,68 +100,137 @@ public class JMXReporter implements MetricReporter { } else if (metric instanceof Counter) { jmxMetric = new JmxCounter((Counter) metric); } else { - throw new IllegalArgumentException("Unknown metric type: " + metric.getClass()); + LOG.error("Unknown metric type: " + metric.getClass().getName()); + return; } try { - mBeanServer.registerMBean(jmxMetric, jmxName); - } catch (NotCompliantMBeanException e) { //implementation error on our side + synchronized (this) { + mBeanServer.registerMBean(jmxMetric, jmxName); + registeredMetrics.put(metric, jmxName); + } + } catch (NotCompliantMBeanException e) { + // implementation error on our side LOG.error("Metric did not comply with JMX MBean naming rules.", e); } catch (InstanceAlreadyExistsException e) { LOG.error("A metric with the name " + jmxName + " was already registered.", e); - } catch (MBeanRegistrationException e) { - LOG.error("Failed to register metric.", e); + } catch (Throwable t) { + LOG.error("Failed to register metric", t); } } @Override - public void notifyOfRemovedMetric(Metric metric, String name) { + public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { try { - mBeanServer.unregisterMBean(new ObjectName(name)); - } catch (MBeanRegistrationException e) { - LOG.error("Un-registering metric failed.", e); - } catch (MalformedObjectNameException e) { - LOG.error("Un-registering metric failed due to invalid name.", e); + synchronized (this) { + final ObjectName jmxName = registeredMetrics.remove(metric); + + // remove the metric if it is known. if it is not known, ignore the request + if (jmxName != null) { + mBeanServer.unregisterMBean(jmxName); + } + } } catch (InstanceNotFoundException e) { - //alright then + // alright then + } catch (Throwable t) { + // never propagate exceptions - the metrics reporter should not affect the stability + // of the running system + LOG.error("Un-registering metric failed", t); } } - @Override - public void open(Configuration config) { - } + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ - @Override - public void close() { - } + static String generateJmxName(String metricName, String[] scopeComponents) { + final StringBuilder nameBuilder = new StringBuilder(128); + nameBuilder.append(PREFIX); - @Override - public String generateName(String name, List<String> origin) { - StringBuilder fullName = new StringBuilder(); - - fullName.append(PREFIX); - for (int x = 0; x < origin.size(); x++) { - fullName.append(KEY_PREFIX); - fullName.append(x); - fullName.append("="); - String value = origin.get(x); - value = value.replaceAll("\"", ""); - value = value.replaceAll(" ", "_"); - value = value.replaceAll("[,=;:?'*]", "-"); - fullName.append(value); - fullName.append(","); + for (int x = 0; x < scopeComponents.length; x++) { + // write keyX= + nameBuilder.append(KEY_PREFIX); + nameBuilder.append(x); + nameBuilder.append("="); + + // write scopeName component + nameBuilder.append(replaceInvalidChars(scopeComponents[x])); + nameBuilder.append(","); } - fullName.append("name=").append(name); - return fullName.toString(); - } + // write the name + nameBuilder.append("name=").append(replaceInvalidChars(metricName)); - public interface MetricMBean { + return nameBuilder.toString(); } + + /** + * Lightweight method to replace unsupported characters. + * If the string does not contain any unsupported characters, this method creates no + * new string (and in fact no new objects at all). + * + * <p>Replacements: + * + * <ul> + * <li>{@code "} is removed</li> + * <li>{@code space} is replaced by {@code _} (underscore)</li> + * <li>{@code , = ; : ? ' *} are replaced by {@code -} (hyphen)</li> + * </ul> + */ + static String replaceInvalidChars(String str) { + char[] chars = null; + final int strLen = str.length(); + int pos = 0; + + for (int i = 0; i < strLen; i++) { + final char c = str.charAt(i); + switch (c) { + case '"': + // remove character by not moving cursor + if (chars == null) { + chars = str.toCharArray(); + } + break; - private abstract static class AbstractBean implements MetricMBean { + case ' ': + if (chars == null) { + chars = str.toCharArray(); + } + chars[pos++] = '_'; + break; + + case ',': + case '=': + case ';': + case ':': + case '?': + case '\'': + case '*': + if (chars == null) { + chars = str.toCharArray(); + } + chars[pos++] = '-'; + break; + + default: + if (chars != null) { + chars[pos] = c; + } + pos++; + } + } + + return chars == null ? str : new String(chars, 0, pos); } + // ------------------------------------------------------------------------ + // Interfaces and base classes for JMX beans + // ------------------------------------------------------------------------ + + public interface MetricMBean {} + + private abstract static class AbstractBean implements MetricMBean {} + public interface JmxCounterMBean extends MetricMBean { long getCount(); } @@ -153,7 +253,7 @@ public class JMXReporter implements MetricReporter { } private static class JmxGauge extends AbstractBean implements JmxGaugeMBean { - + private final Gauge<?> gauge; public JmxGauge(Gauge<?> gauge) { http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java index f87f5d3..9458246 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/MetricReporter.java @@ -15,28 +15,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.reporter; -import com.codahale.metrics.Reporter; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Metric; - -import java.util.List; +import org.apache.flink.metrics.groups.AbstractMetricGroup; /** - * Reporters are used to export {@link org.apache.flink.metrics.Metric}s to an external backend. - * <p> - * Reporters are instantiated generically and must have a no-argument constructor. + * Reporters are used to export {@link Metric Metrics} to an external backend. + * + * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a + * public no-argument constructor. */ @PublicEvolving -public interface MetricReporter extends Reporter { - +public interface MetricReporter { + + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + /** * Configures this reporter. Since reporters are instantiated generically and hence parameter-less, * this method is the place where the reporters set their basic fields based on configuration values. - * <p> - * This method is always called first on a newly instantiated reporter. + * + * <p>This method is always called first on a newly instantiated reporter. * * @param config The configuration with all parameters. */ @@ -47,28 +51,25 @@ public interface MetricReporter extends Reporter { */ void close(); - /** - * Called when a new {@link org.apache.flink.metrics.Metric} was added. - * - * @param metric metric that was added - * @param name name of the metric - */ - void notifyOfAddedMetric(Metric metric, String name); + // ------------------------------------------------------------------------ + // adding / removing metrics + // ------------------------------------------------------------------------ /** - * Called when a {@link org.apache.flink.metrics.Metric} was removed. + * Called when a new {@link Metric} was added. * - * @param metric metric that was removed - * @param name name of the metric + * @param metric the metric that was added + * @param metricName the name of the metric + * @param group the group that contains the metric */ - void notifyOfRemovedMetric(Metric metric, String name); + void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group); /** - * Generates the reported name of a metric based on it's hierarchy/scope and associated name. + * Called when a {@link Metric} was should be removed. * - * @param name name of the metric - * @param scope hierarchy/scope of the metric - * @return reported name + * @param metric the metric that should be removed + * @param metricName the name of the metric + * @param group the group that contains the metric */ - String generateName(String name, List<String> scope); + void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group); } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java index 3638f7a..cf1fc52 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/reporter/Scheduled.java @@ -15,18 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.reporter; import org.apache.flink.annotation.PublicEvolving; /** - * Marker interface for reporters that actively send out data periodically. + * Interface for reporters that actively send out data periodically. */ @PublicEvolving public interface Scheduled { + /** - * Report the current measurements. - * This method is called in regular intervals + * Report the current measurements. This method is called periodically by the + * metrics registry that uses the reoprter. */ void report(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java index 83c88cc..8f00cd5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java @@ -31,7 +31,8 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + import org.junit.Test; @@ -42,7 +43,11 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableNotFound() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); + RuntimeUDFContext ctx = new RuntimeUDFContext( + taskInfo, getClass().getClassLoader(), new ExecutionConfig(), + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()); try { ctx.getBroadcastVariable("some name"); @@ -72,7 +77,11 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableSimple() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); + RuntimeUDFContext ctx = new RuntimeUDFContext( + taskInfo, getClass().getClassLoader(), new ExecutionConfig(), + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()); ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4)); ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0)); @@ -106,7 +115,11 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); + RuntimeUDFContext ctx = new RuntimeUDFContext( + taskInfo, getClass().getClassLoader(), new ExecutionConfig(), + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -131,7 +144,11 @@ public class RuntimeUDFContextTest { @Test public void testResetBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); + RuntimeUDFContext ctx = new RuntimeUDFContext( + taskInfo, getClass().getClassLoader(), new ExecutionConfig(), + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -154,7 +171,11 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializerAndMismatch() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()); + RuntimeUDFContext ctx = new RuntimeUDFContext( + taskInfo, getClass().getClassLoader(), new ExecutionConfig(), + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java index 554820e..ae0f8e5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.io; import java.util.HashMap; @@ -27,11 +26,13 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.types.Value; -import org.junit.Assert; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + /** * Tests runtime context access from inside an RichInputFormat class */ @@ -41,9 +42,14 @@ public class RichInputFormatTest { public void testCheckRuntimeContextAccess() { final SerializedInputFormat<Value> inputFormat = new SerializedInputFormat<Value>(); final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0); - inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup())); + inputFormat.setRuntimeContext( + new RuntimeUDFContext( + taskInfo, getClass().getClassLoader(), new ExecutionConfig(), + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup())); - Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); - Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); + assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); + assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java index 09db3a9..296af11 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java @@ -16,11 +16,9 @@ * limitations under the License. */ - package org.apache.flink.api.common.io; import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; @@ -28,11 +26,13 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.types.Value; -import org.junit.Assert; + import org.junit.Test; +import static org.junit.Assert.assertEquals; + /** * Tests runtime context access from inside an RichOutputFormat class */ @@ -42,9 +42,14 @@ public class RichOutputFormatTest { public void testCheckRuntimeContextAccess() { final SerializedOutputFormat<Value> inputFormat = new SerializedOutputFormat<Value>(); final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0); - inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup())); + + inputFormat.setRuntimeContext(new RuntimeUDFContext( + taskInfo, getClass().getClassLoader(), new ExecutionConfig(), + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup())); - Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); - Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); + assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); + assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java index 7c905c1..71bb102 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java @@ -28,8 +28,9 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.common.operators.util.TestRichOutputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.types.Nothing; + import org.junit.Test; import java.util.HashMap; @@ -95,13 +96,20 @@ public class GenericDataSinkBaseTest implements java.io.Serializable { final TaskInfo taskInfo = new TaskInfo("test_sink", 0, 1, 0); executionConfig.disableObjectReuse(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); - assertEquals(out.output, asList(TestIOData.RICH_NAMES)); + + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext( + taskInfo, null, executionConfig, cpTasks, accumulatorMap, new UnregisteredMetricsGroup()), + executionConfig); + + assertEquals(out.output, asList(TestIOData.RICH_NAMES)); executionConfig.enableObjectReuse(); out.clear(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); + + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext( + taskInfo, null, executionConfig, cpTasks, accumulatorMap, new UnregisteredMetricsGroup()), + executionConfig); assertEquals(out.output, asList(TestIOData.RICH_NAMES)); } catch(Exception e){ e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java index c360c62..2dabe48 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java @@ -27,7 +27,8 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.common.operators.util.TestRichInputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.junit.Test; import java.util.HashMap; @@ -84,7 +85,11 @@ public class GenericDataSourceBaseTest implements java.io.Serializable { executionConfig.disableObjectReuse(); assertEquals(false, in.hasBeenClosed()); assertEquals(false, in.hasBeenOpened()); - List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); + + List<String> resultMutableSafe = source.executeOnCollections( + new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, + new UnregisteredMetricsGroup()), executionConfig); + assertEquals(true, in.hasBeenClosed()); assertEquals(true, in.hasBeenOpened()); @@ -92,13 +97,18 @@ public class GenericDataSourceBaseTest implements java.io.Serializable { executionConfig.enableObjectReuse(); assertEquals(false, in.hasBeenClosed()); assertEquals(false, in.hasBeenOpened()); - List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); + + List<String> resultRegular = source.executeOnCollections( + new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, + new UnregisteredMetricsGroup()), executionConfig); + assertEquals(true, in.hasBeenClosed()); assertEquals(true, in.hasBeenOpened()); assertEquals(asList(TestIOData.RICH_NAMES), resultMutableSafe); assertEquals(asList(TestIOData.RICH_NAMES), resultRegular); - } catch(Exception e){ + } + catch(Exception e){ e.printStackTrace(); fail(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 9447efd..f125c4b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -29,8 +29,9 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; + import org.junit.Assert; import org.junit.Test; @@ -79,7 +80,11 @@ public class FlatMapOperatorCollectionTest implements Serializable { final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0); // run on collections final List<String> result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + .executeOnCollections(input, + new RuntimeUDFContext( + taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), new UnregisteredMetricsGroup()), + executionConfig); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index a610a4d..8befcb9 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -30,8 +30,9 @@ import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; + import org.junit.Test; import java.io.Serializable; @@ -125,10 +126,18 @@ public class InnerJoinOperatorBaseTest implements Serializable { final HashMap<String, Future<Path>> cpTasks = new HashMap<>(); ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); - List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); + List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, + new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, + accumulatorMap, new UnregisteredMetricsGroup()), + executionConfig); + executionConfig.enableObjectReuse(); - List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); + List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, + new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, + accumulatorMap, new UnregisteredMetricsGroup()), + executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 7ecdefa..d79e2a5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -18,15 +18,6 @@ package org.apache.flink.api.common.operators.base; -import static org.junit.Assert.*; -import static java.util.Arrays.asList; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; @@ -35,13 +26,24 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + @SuppressWarnings("serial") public class MapOperatorTest implements java.io.Serializable { @@ -113,9 +115,17 @@ public class MapOperatorTest implements java.io.Serializable { final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); + + List<Integer> resultMutableSafe = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, + accumulatorMap, new UnregisteredMetricsGroup()), + executionConfig); + executionConfig.enableObjectReuse(); - List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap, new DummyMetricGroup()), executionConfig); + List<Integer> resultRegular = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, + accumulatorMap, new UnregisteredMetricsGroup()), + executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 5012718..83c194a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -31,7 +31,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.util.DummyMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.Collector; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; @@ -40,6 +40,7 @@ import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; + import org.junit.Test; @SuppressWarnings("serial") @@ -86,9 +87,21 @@ public class PartitionMapOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + + List<Integer> resultMutableSafe = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); + executionConfig.enableObjectReuse(); - List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>(), new DummyMetricGroup()), executionConfig); + List<Integer> resultRegular = op.executeOnCollections(input, + new RuntimeUDFContext(taskInfo, null, executionConfig, + new HashMap<String, Future<Path>>(), + new HashMap<String, Accumulator<?, ?>>(), + new UnregisteredMetricsGroup()), + executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java index 32cc11c..f8e0bf5 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java @@ -15,22 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.JobMetricGroup; -import org.apache.flink.metrics.groups.OperatorMetricGroup; -import org.apache.flink.metrics.groups.Scope; +import org.apache.flink.metrics.groups.AbstractMetricGroup; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.metrics.groups.scope.ScopeFormats; import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.metrics.util.TestReporter; + import org.junit.Assert; import org.junit.Test; -import java.util.List; +import static org.junit.Assert.*; public class MetricRegistryTest { + /** * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. */ @@ -118,45 +119,11 @@ public class MetricRegistryTest { } /** - * Verifies that groups are correctly created, nesting works, and names are properly forwarded to generate names. - */ - @Test - public void testMetricGroupGeneration() { - Configuration config = new Configuration(); - - config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter4.class.getName()); - - MetricRegistry registry = new MetricRegistry(config); - - MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); - root.counter("rootCounter"); - root.addGroup("top").counter("topCounter"); - } - - protected static class TestReporter4 extends TestReporter { - @Override - public String generateName(String name, List<String> scope) { - if (name.compareTo("rootCounter") == 0) { - Assert.assertEquals("host", scope.get(0)); - return "success"; - } else if (name.compareTo("topCounter") == 0) { - Assert.assertEquals("host", scope.get(0)); - Assert.assertEquals("taskmanager", scope.get(1)); - return "success"; - } else { - Assert.fail(); - return null; - } - } - } - - /** * Verifies that reporters implementing the Listener interface are notified when Metrics are added or removed. */ @Test public void testListener() { Configuration config = new Configuration(); - config.setString(MetricRegistry.KEY_METRICS_REPORTER_CLASS, TestReporter6.class.getName()); MetricRegistry registry = new MetricRegistry(config); @@ -165,8 +132,8 @@ public class MetricRegistryTest { root.counter("rootCounter"); root.close(); - Assert.assertTrue(TestReporter6.addCalled); - Assert.assertTrue(TestReporter6.removeCalled); + assertTrue(TestReporter6.addCalled); + assertTrue(TestReporter6.removeCalled); } protected static class TestReporter6 extends TestReporter { @@ -174,22 +141,22 @@ public class MetricRegistryTest { public static boolean removeCalled = false; @Override - public void notifyOfAddedMetric(Metric metric, String name) { + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { addCalled = true; - Assert.assertTrue(metric instanceof Counter); - Assert.assertEquals("rootCounter", name); + assertTrue(metric instanceof Counter); + assertEquals("rootCounter", metricName); } @Override - public void notifyOfRemovedMetric(Metric metric, String name) { + public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { removeCalled = true; Assert.assertTrue(metric instanceof Counter); - Assert.assertEquals("rootCounter", name); + Assert.assertEquals("rootCounter", metricName); } } /** - * Verifies that the scope configuration is properly extracted. + * Verifies that the scopeName configuration is properly extracted. */ @Test public void testScopeConfig() { @@ -200,18 +167,11 @@ public class MetricRegistryTest { config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, "C"); config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D"); - Scope.ScopeFormat scopeConfig = new MetricRegistry(config).getScopeConfig(); - - Assert.assertEquals("A", scopeConfig.getTaskManagerFormat()); - Assert.assertEquals("B", scopeConfig.getJobFormat()); - Assert.assertEquals("C", scopeConfig.getTaskFormat()); - Assert.assertEquals("D", scopeConfig.getOperatorFormat()); - - Scope.ScopeFormat emptyScopeConfig = new MetricRegistry(new Configuration()).getScopeConfig(); + ScopeFormats scopeConfig = MetricRegistry.createScopeConfig(config); - Assert.assertEquals(TaskManagerMetricGroup.DEFAULT_SCOPE_TM, emptyScopeConfig.getTaskManagerFormat()); - Assert.assertEquals(JobMetricGroup.DEFAULT_SCOPE_JOB, emptyScopeConfig.getJobFormat()); - Assert.assertEquals(TaskMetricGroup.DEFAULT_SCOPE_TASK, emptyScopeConfig.getTaskFormat()); - Assert.assertEquals(OperatorMetricGroup.DEFAULT_SCOPE_OPERATOR, emptyScopeConfig.getOperatorFormat()); + assertEquals("A", scopeConfig.getTaskManagerFormat().format()); + assertEquals("B", scopeConfig.getJobFormat().format()); + assertEquals("C", scopeConfig.getTaskFormat().format()); + assertEquals("D", scopeConfig.getOperatorFormat().format()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java index cea11fb..e820762 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java @@ -15,64 +15,77 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.util.AbstractID; -import org.junit.Test; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; -import java.util.List; +import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class JobGroupTest { + @Test public void testGenerateScopeDefault() { MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task"); - JobMetricGroup jmGroup = tmGroup.parent(); + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"}, + jmGroup.getScopeComponents()); - List<String> scope = jmGroup.generateScope(); - assertEquals(4, scope.size()); - assertEquals("job", scope.get(3)); + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName", + jmGroup.getScopeString()); } @Test - public void testGenerateScopeWildcard() { + public void testGenerateScopeCustom() { MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task"); - JobMetricGroup jmGroup = tmGroup.parent(); + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat); - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setJobFormat(Scope.concat(Scope.SCOPE_WILDCARD, "superjob", JobMetricGroup.SCOPE_JOB_NAME)); + JobID jid = new JobID(); - List<String> scope = jmGroup.generateScope(format); - assertEquals(5, scope.size()); - assertEquals("superjob", scope.get(3)); - assertEquals("job", scope.get(4)); + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "some-constant", "myJobName" }, + jmGroup.getScopeComponents()); + + assertEquals( + "some-constant.myJobName", + jmGroup.getScopeString()); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustomWildcard() { MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task"); - JobMetricGroup jmGroup = tmGroup.parent(); + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); + + JobID jid = new JobID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setJobFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, "superjob", JobMetricGroup.SCOPE_JOB_NAME)); + assertArrayEquals( + new String[] { "peter", "test-tm-id", "some-constant", jid.toString() }, + jmGroup.getScopeComponents()); - List<String> scope = jmGroup.generateScope(format); - assertEquals(3, scope.size()); - assertEquals("host", scope.get(0)); - assertEquals("superjob", scope.get(1)); - assertEquals("job", scope.get(2)); + assertEquals( + "peter.test-tm-id.some-constant." + jid, + jmGroup.getScopeString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java index cb962ea..5645b94 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupRegistrationTest.java @@ -62,13 +62,10 @@ public class MetricGroupRegistrationTest { public static String lastPassedName; @Override - public void notifyOfAddedMetric(Metric metric, String name) { + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { lastPassedMetric = metric; - lastPassedName = name; + lastPassedName = metricName; } - - @Override - public void notifyOfRemovedMetric(Metric metric, String name) {} } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java index 31ab2a7..2849bab 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/MetricGroupTest.java @@ -24,7 +24,6 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.util.DummyMetricGroup; import org.junit.Test; import static org.junit.Assert.*; @@ -37,7 +36,8 @@ public class MetricGroupTest { @Test public void sameGroupOnNameCollision() { - GenericMetricGroup group = new GenericMetricGroup(registry, new DummyMetricGroup(registry), "somegroup"); + GenericMetricGroup group = new GenericMetricGroup( + registry, new DummyAbstractMetricGroup(registry), "somegroup"); String groupName = "sometestname"; MetricGroup subgroup1 = group.addGroup(groupName); @@ -51,7 +51,7 @@ public class MetricGroupTest { @Test public void closedGroupDoesNotRegisterMetrics() { GenericMetricGroup group = new GenericMetricGroup( - exceptionOnRegister, new DummyMetricGroup(exceptionOnRegister), "testgroup"); + exceptionOnRegister, new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup"); assertFalse(group.isClosed()); group.close(); @@ -68,7 +68,7 @@ public class MetricGroupTest { @Test public void closedGroupCreatesClosedGroups() { GenericMetricGroup group = new GenericMetricGroup(exceptionOnRegister, - new DummyMetricGroup(exceptionOnRegister), "testgroup"); + new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup"); assertFalse(group.isClosed()); group.close(); @@ -82,7 +82,7 @@ public class MetricGroupTest { public void tolerateMetricNameCollisions() { final String name = "abctestname"; GenericMetricGroup group = new GenericMetricGroup( - registry, new DummyMetricGroup(registry), "testgroup"); + registry, new DummyAbstractMetricGroup(registry), "testgroup"); assertNotNull(group.counter(name)); assertNotNull(group.counter(name)); @@ -91,7 +91,8 @@ public class MetricGroupTest { @Test public void tolerateMetricAndGroupNameCollisions() { final String name = "abctestname"; - GenericMetricGroup group = new GenericMetricGroup(registry, new DummyMetricGroup(registry), "testgroup"); + GenericMetricGroup group = new GenericMetricGroup( + registry, new DummyAbstractMetricGroup(registry), "testgroup"); assertNotNull(group.addGroup(name)); assertNotNull(group.counter(name)); @@ -100,7 +101,7 @@ public class MetricGroupTest { @Test(expected = IllegalArgumentException.class) public void exceptionOnIllegalName() { GenericMetricGroup group = new GenericMetricGroup( - exceptionOnRegister, new DummyMetricGroup(exceptionOnRegister), "testgroup"); + exceptionOnRegister, new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup"); group.counter("ÃberCöunter"); } @@ -122,4 +123,21 @@ public class MetricGroupTest { fail("Metric should never be un-registered"); } } + + // ------------------------------------------------------------------------ + + private static class DummyAbstractMetricGroup extends AbstractMetricGroup { + + public DummyAbstractMetricGroup(MetricRegistry registry) { + super(registry, new String[0]); + } + + @Override + protected void addMetric(String name, Metric metric) {} + + @Override + public MetricGroup addGroup(String name) { + return new DummyAbstractMetricGroup(registry); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java index 6202834..cb5e082 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java @@ -15,73 +15,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.util.AbstractID; -import org.junit.Test; -import java.util.List; +import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class OperatorGroupTest { - @Test - public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task") - .addOperator("operator"); - - List<String> scope = operator.generateScope(); - assertEquals(6, scope.size()); - assertEquals("host", scope.get(0)); - assertEquals("taskmanager", scope.get(1)); - assertEquals("id", scope.get(2)); - assertEquals("job", scope.get(3)); - assertEquals("operator", scope.get(4)); - assertEquals("0", scope.get(5)); - } @Test - public void testGenerateScopeWildcard() { + public void testGenerateScopeDefault() { MetricRegistry registry = new MetricRegistry(new Configuration()); - OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task") - .addOperator("operator"); - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setOperatorFormat(Scope.concat(Scope.SCOPE_WILDCARD, "op", OperatorMetricGroup.SCOPE_OPERATOR_NAME)); - - List<String> scope = operator.generateScope(format); - assertEquals(7, scope.size()); - assertEquals("host", scope.get(0)); - assertEquals("taskmanager", scope.get(1)); - assertEquals("id", scope.get(2)); - assertEquals("job", scope.get(3)); - assertEquals("task", scope.get(4)); - assertEquals("op", scope.get(5)); - assertEquals("operator", scope.get(6)); - } - - @Test - public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - OperatorMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task") - .addOperator("operator"); + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskMetricGroup taskGroup = new TaskMetricGroup( + registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0); + OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName"); - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setOperatorFormat(Scope.concat("jobs", JobMetricGroup.SCOPE_JOB_NAME, "op", OperatorMetricGroup.SCOPE_OPERATOR_NAME)); + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "myOpName", "11" }, + opGroup.getScopeComponents()); - List<String> scope = operator.generateScope(format); - assertEquals(4, scope.size()); - assertEquals("jobs", scope.get(0)); - assertEquals("job", scope.get(1)); - assertEquals("op", scope.get(2)); - assertEquals("operator", scope.get(3)); + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11", + opGroup.getScopeString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java index 69faee7..4a492d2 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java @@ -15,16 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskScopeFormat; import org.apache.flink.util.AbstractID; -import org.junit.Test; -import java.util.List; +import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class TaskGroupTest { @@ -36,48 +41,74 @@ public class TaskGroupTest { @Test public void testGenerateScopeDefault() { MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task"); - List<String> scope = operator.generateScope(); - assertEquals(5, scope.size()); - assertEquals("task", scope.get(4)); + AbstractID vertexId = new AbstractID(); + AbstractID executionId = new AbstractID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "aTaskName", "13"}, + taskGroup.getScopeComponents()); + + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13", + taskGroup.getScopeString()); } @Test - public void testGenerateScopeWilcard() { + public void testGenerateScopeCustom() { MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task"); - - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setTaskFormat(Scope.concat(Scope.SCOPE_WILDCARD, "supertask", TaskMetricGroup.SCOPE_TASK_NAME)); - - List<String> scope = operator.generateScope(format); - assertEquals(6, scope.size()); - assertEquals("host", scope.get(0)); - assertEquals("taskmanager", scope.get(1)); - assertEquals("id", scope.get(2)); - assertEquals("job", scope.get(3)); - assertEquals("supertask", scope.get(4)); - assertEquals("task", scope.get(5)); + + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("def", tmFormat); + TaskScopeFormat taskFormat = new TaskScopeFormat("<tm_id>.<job_id>.<task_id>.<task_attempt_id>", jmFormat); + + JobID jid = new JobID(); + AbstractID vertexId = new AbstractID(); + AbstractID executionId = new AbstractID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jid, "myJobName"); + TaskMetricGroup taskGroup = new TaskMetricGroup( + registry, jmGroup, taskFormat, vertexId, executionId, "aTaskName", 13, 2); + + assertArrayEquals( + new String[] { "test-tm-id", jid.toString(), vertexId.toString(), executionId.toString() }, + taskGroup.getScopeComponents()); + + assertEquals( + String.format("test-tm-id.%s.%s.%s", jid, vertexId, executionId), + taskGroup.getScopeString()); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeWilcard() { MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id") - .addTaskForJob(new JobID(), "job", new AbstractID(), new AbstractID(), 0, "task"); - - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setTaskFormat(Scope.concat(TaskManagerMetricGroup.SCOPE_TM_HOST, JobMetricGroup.SCOPE_JOB_NAME, "supertask", TaskMetricGroup.SCOPE_TASK_NAME)); - - List<String> scope = operator.generateScope(format); - assertEquals(4, scope.size()); - assertEquals("host", scope.get(0)); - assertEquals("job", scope.get(1)); - assertEquals("supertask", scope.get(2)); - assertEquals("task", scope.get(3)); + + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, tmFormat); + + TaskScopeFormat format = new TaskScopeFormat("*.<task_attempt_id>.<subtask_index>", jmFormat); + + AbstractID executionId = new AbstractID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + + TaskMetricGroup taskGroup = new TaskMetricGroup( + registry, jmGroup, format, new AbstractID(), executionId, "aTaskName", 13, 1); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13" }, + taskGroup.getScopeComponents()); + + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13", + taskGroup.getScopeString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java index 6b9a5fc..9adc1be 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerGroupTest.java @@ -15,15 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; import org.apache.flink.util.AbstractID; -import org.junit.Test; -import java.util.List; +import org.junit.Test; import static org.junit.Assert.*; @@ -55,9 +56,9 @@ public class TaskManagerGroupTest { final AbstractID execution13 = new AbstractID(); final AbstractID execution21 = new AbstractID(); - TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, 17, "test"); - TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, 13, "test"); - TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, 7, "test"); + TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 0); + TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 1); + TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 2); assertEquals(2, group.numRegisteredJobMetricGroups()); assertFalse(tmGroup11.parent().isClosed()); @@ -77,7 +78,7 @@ public class TaskManagerGroupTest { assertEquals(1, group.numRegisteredJobMetricGroups()); // add one more to job one - TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, vertex13, execution13, 0, "test"); + TaskMetricGroup tmGroup13 = group.addTaskForJob(jid1, jobName1, vertex13, execution13, "test", 0, 0); tmGroup12.close(); tmGroup13.close(); @@ -108,9 +109,9 @@ public class TaskManagerGroupTest { final AbstractID execution12 = new AbstractID(); final AbstractID execution21 = new AbstractID(); - TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, 17, "test"); - TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, 13, "test"); - TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, 7, "test"); + TaskMetricGroup tmGroup11 = group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 1); + TaskMetricGroup tmGroup12 = group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 2); + TaskMetricGroup tmGroup21 = group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 1); group.close(); @@ -120,48 +121,25 @@ public class TaskManagerGroupTest { } // ------------------------------------------------------------------------ - // scope tests + // scope name tests // ------------------------------------------------------------------------ @Test public void testGenerateScopeDefault() { MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id"); - - List<String> scope = operator.generateScope(); - assertEquals(3, scope.size()); - assertEquals("host", scope.get(0)); - assertEquals("taskmanager", scope.get(1)); - assertEquals("id", scope.get(2)); - } - - @Test - public void testGenerateScopeWildcard() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id"); + TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id"); - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setTaskManagerFormat(Scope.concat(Scope.SCOPE_WILDCARD, "superhost", TaskManagerMetricGroup.SCOPE_TM_HOST)); - - List<String> scope = operator.generateScope(format); - assertEquals(2, scope.size()); - assertEquals("superhost", scope.get(0)); - assertEquals("host", scope.get(1)); + assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents()); + assertEquals("localhost.taskmanager.id", group.getScopeString()); } @Test public void testGenerateScopeCustom() { MetricRegistry registry = new MetricRegistry(new Configuration()); - TaskManagerMetricGroup operator = new TaskManagerMetricGroup(registry, "host", "id"); - - Scope.ScopeFormat format = new Scope.ScopeFormat(); - format.setTaskManagerFormat(Scope.concat("h", TaskManagerMetricGroup.SCOPE_TM_HOST, "t", TaskManagerMetricGroup.SCOPE_TM_ID)); + TaskManagerScopeFormat format = new TaskManagerScopeFormat("constant.<host>.foo.<host>"); + TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, format, "host", "id"); - List<String> scope = operator.generateScope(format); - assertEquals(4, scope.size()); - assertEquals("h", scope.get(0)); - assertEquals("host", scope.get(1)); - assertEquals("t", scope.get(2)); - assertEquals("id", scope.get(3)); + assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents()); + assertEquals("constant.host.foo.host", group.getScopeString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java index 0d683c2..abe1669 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/reporter/JMXReporterTest.java @@ -15,29 +15,40 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.metrics.reporter; -import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; +import static org.junit.Assert.*; public class JMXReporterTest { + + @Test + public void testReplaceInvalidChars() { + assertEquals("", JMXReporter.replaceInvalidChars("")); + assertEquals("abc", JMXReporter.replaceInvalidChars("abc")); + assertEquals("abc", JMXReporter.replaceInvalidChars("abc\"")); + assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc")); + assertEquals("abc", JMXReporter.replaceInvalidChars("\"abc\"")); + assertEquals("abc", JMXReporter.replaceInvalidChars("\"a\"b\"c\"")); + assertEquals("", JMXReporter.replaceInvalidChars("\"\"\"\"")); + assertEquals("____", JMXReporter.replaceInvalidChars(" ")); + assertEquals("ab_-(c)-", JMXReporter.replaceInvalidChars("\"ab ;(c)'")); + assertEquals("a_b_c", JMXReporter.replaceInvalidChars("a b c")); + assertEquals("a_b_c_", JMXReporter.replaceInvalidChars("a b c ")); + assertEquals("a-b-c-", JMXReporter.replaceInvalidChars("a;b'c*")); + assertEquals("a------b------c", JMXReporter.replaceInvalidChars("a,=;:?'b,=;:?'c")); + } + /** * Verifies that the JMXReporter properly generates the JMX name. */ @Test public void testGenerateName() { - String name = "metric"; - - List<String> scope = new ArrayList<>(); - scope.add("value0"); - scope.add("value1"); - scope.add("\"value2 (test),=;:?'"); - - String jmxName = new JMXReporter().generateName(name, scope); + String[] scope = { "value0", "value1", "\"value2 (test),=;:?'" }; + String jmxName = JMXReporter.generateJmxName("TestMetric", scope); - Assert.assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=metric", jmxName); + assertEquals("org.apache.flink.metrics:key0=value0,key1=value1,key2=value2_(test)------,name=TestMetric", jmxName); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java deleted file mode 100644 index ab78288..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyJobMetricGroup.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.util; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.JobMetricGroup; -import org.apache.flink.util.AbstractID; - -public class DummyJobMetricGroup extends JobMetricGroup { - - public DummyJobMetricGroup() { - super(new DummyMetricRegistry(), new DummyTaskManagerMetricGroup(), new JobID(), "job"); - } - - @Override - public DummyTaskMetricGroup addTask(AbstractID id, AbstractID attemptID, int subtaskIndex, String name) { - return new DummyTaskMetricGroup(); - } - - @Override - public void removeTaskMetricGroup(AbstractID executionId) {} - - @Override - protected void addMetric(String name, Metric metric) {} - - @Override - public MetricGroup addGroup(String name) { - return new DummyMetricGroup(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java deleted file mode 100644 index 77ddd17..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricGroup.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.util; - -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.metrics.groups.Scope; - -import java.util.ArrayList; -import java.util.List; - -public class DummyMetricGroup extends AbstractMetricGroup { - - public DummyMetricGroup() { - this(new DummyMetricRegistry()); - } - - public DummyMetricGroup(MetricRegistry registry) { - super(registry); - } - - @Override - public List<String> generateScope() { - return new ArrayList<>(); - } - - @Override - public List<String> generateScope(Scope.ScopeFormat format) { - return new ArrayList<>(); - } - - @Override - protected void addMetric(String name, Metric metric) {} - - - @Override - public MetricGroup addGroup(String name) { - return new DummyMetricGroup(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java deleted file mode 100644 index f8b73a9..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyMetricRegistry.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.util; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricRegistry; - -public class DummyMetricRegistry extends MetricRegistry { - - public DummyMetricRegistry() { - super(new Configuration()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7ad8375a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java b/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java deleted file mode 100644 index e271d6e..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/util/DummyOperatorMetricGroup.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.metrics.util; - -import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.OperatorMetricGroup; - -public class DummyOperatorMetricGroup extends OperatorMetricGroup { - - public DummyOperatorMetricGroup() { - super(new DummyMetricRegistry(), new DummyTaskMetricGroup(), "operator", 0); - } - - @Override - protected void addMetric(String name, Metric metric) {} - - @Override - public MetricGroup addGroup(String name) { - return new DummyMetricGroup(); - } -}
