http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java new file mode 100644 index 0000000..8e23d2f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.TaskScopeFormat; +import org.apache.flink.util.AbstractID; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing a Flink runtime Task. + * + * <p>Contains extra logic for adding operators. + */ +public class TaskMetricGroup extends ComponentMetricGroup { + + /** The job metrics group containing this task metrics group */ + private final TaskManagerJobMetricGroup parent; + + private final Map<String, OperatorMetricGroup> operators = new HashMap<>(); + + private final IOMetricGroup ioMetrics; + + /** The execution Id uniquely identifying the executed task represented by this metrics group */ + private final AbstractID executionId; + + @Nullable + private final AbstractID vertexId; + + @Nullable + private final String taskName; + + private final int subtaskIndex; + + private final int attemptNumber; + + // ------------------------------------------------------------------------ + + public TaskMetricGroup( + MetricRegistry registry, + TaskManagerJobMetricGroup parent, + @Nullable AbstractID vertexId, + AbstractID executionId, + @Nullable String taskName, + int subtaskIndex, + int attemptNumber) { + + this(registry, parent, registry.getScopeFormats().getTaskFormat(), + vertexId, executionId, taskName, subtaskIndex, attemptNumber); + } + + public TaskMetricGroup( + MetricRegistry registry, + TaskManagerJobMetricGroup parent, + TaskScopeFormat scopeFormat, + @Nullable AbstractID vertexId, + AbstractID executionId, + @Nullable String taskName, + int subtaskIndex, + int attemptNumber) { + + super(registry, scopeFormat.formatScope( + parent, vertexId, executionId, taskName, subtaskIndex, attemptNumber)); + + this.parent = checkNotNull(parent); + this.executionId = checkNotNull(executionId); + this.vertexId = vertexId; + this.taskName = taskName; + this.subtaskIndex = subtaskIndex; + this.attemptNumber = attemptNumber; + + this.ioMetrics = new IOMetricGroup(this); + } + + // ------------------------------------------------------------------------ + // properties + // ------------------------------------------------------------------------ + + public final TaskManagerJobMetricGroup parent() { + return parent; + } + + public AbstractID executionId() { + return executionId; + } + + @Nullable + public AbstractID vertexId() { + return vertexId; + } + + @Nullable + public String taskName() { + return taskName; + } + + public int subtaskIndex() { + return subtaskIndex; + } + + public int attemptNumber() { + return attemptNumber; + } + + /** + * Returns the IOMetricGroup for this task. + * + * @return IOMetricGroup for this task. + */ + public IOMetricGroup getIOMetricGroup() { + return ioMetrics; + } + + // ------------------------------------------------------------------------ + // operators and cleanup + // ------------------------------------------------------------------------ + + public OperatorMetricGroup addOperator(String name) { + OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name); + + synchronized (this) { + OperatorMetricGroup previous = operators.put(name, operator); + if (previous == null) { + // no operator group so far + return operator; + } else { + // already had an operator group. restore that one. + operators.put(name, previous); + return previous; + } + } + } + + @Override + public void close() { + super.close(); + + parent.removeTaskMetricGroup(executionId); + } + + // ------------------------------------------------------------------------ + // Component Metric Group Specifics + // ------------------------------------------------------------------------ + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return operators.values(); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java new file mode 100644 index 0000000..2d5356a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerJobScopeFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +/** + * The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobMetricGroup}. + */ +public class JobManagerJobScopeFormat extends ScopeFormat { + + public JobManagerJobScopeFormat(String format, JobManagerScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_ACTOR_HOST, + SCOPE_JOB_ID, + SCOPE_JOB_NAME + }); + } + + public String[] formatScope(JobManagerMetricGroup parent, JobID jid, String jobName) { + final String[] template = copyTemplate(); + final String[] values = { + parent.hostname(), + valueOrNull(jid), + valueOrNull(jobName) + }; + return bindVariables(template, values); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java new file mode 100644 index 0000000..14f1b72 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/JobManagerScopeFormat.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +/** + * The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup}. + */ +public class JobManagerScopeFormat extends ScopeFormat { + + public JobManagerScopeFormat(String format) { + super(format, null, new String[] { + SCOPE_ACTOR_HOST + }); + } + + public String[] formatScope(String hostname) { + final String[] template = copyTemplate(); + final String[] values = { hostname }; + return bindVariables(template, values); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java new file mode 100644 index 0000000..fcebe37 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/OperatorScopeFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; + +/** + * The scope format for the {@link org.apache.flink.runtime.metrics.groups.OperatorMetricGroup}. + */ +public class OperatorScopeFormat extends ScopeFormat { + + public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_ACTOR_HOST, + SCOPE_TASKMANAGER_ID, + SCOPE_JOB_ID, + SCOPE_JOB_NAME, + SCOPE_TASK_VERTEX_ID, + SCOPE_TASK_ATTEMPT_ID, + SCOPE_TASK_NAME, + SCOPE_TASK_SUBTASK_INDEX, + SCOPE_TASK_ATTEMPT_NUM, + SCOPE_OPERATOR_NAME + }); + } + + public String[] formatScope(TaskMetricGroup parent, String operatorName) { + + final String[] template = copyTemplate(); + final String[] values = { + parent.parent().parent().hostname(), + parent.parent().parent().taskManagerId(), + valueOrNull(parent.parent().jobId()), + valueOrNull(parent.parent().jobName()), + valueOrNull(parent.vertexId()), + valueOrNull(parent.executionId()), + valueOrNull(parent.taskName()), + String.valueOf(parent.subtaskIndex()), + String.valueOf(parent.attemptNumber()), + valueOrNull(operatorName) + }; + return bindVariables(template, values); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java new file mode 100644 index 0000000..a45df41 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.metrics.CharacterFilter; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class represents the format after which the "scope" (or namespace) of the various + * component metric groups is built. Component metric groups are for example + * "TaskManager", "Task", or "Operator". + * + * <p>User defined scope formats allow users to include or exclude + * certain identifiers from the scope. The scope for metrics belonging to the "Task" + * group could for example include the task attempt number (more fine grained identification), or + * exclude it (continuity of the namespace across failure and recovery). + */ +public abstract class ScopeFormat { + + private static CharacterFilter defaultFilter = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input; + } + }; + + // ------------------------------------------------------------------------ + // Scope Format Special Characters + // ------------------------------------------------------------------------ + + /** + * If the scope format starts with this character, then the parent components scope + * format will be used as a prefix. + * + * <p>For example, if the TaskManager's job format is {@code "*.<job_name>"}, and the + * TaskManager format is {@code "<host>"}, then the job's metrics + * will have {@code "<host>.<job_name>"} as their scope. + */ + public static final String SCOPE_INHERIT_PARENT = "*"; + + public static final String SCOPE_SEPARATOR = "."; + + private static final String SCOPE_VARIABLE_PREFIX = "<"; + private static final String SCOPE_VARIABLE_SUFFIX = ">"; + + // ------------------------------------------------------------------------ + // Scope Variables + // ------------------------------------------------------------------------ + + public static final String SCOPE_ACTOR_HOST = asVariable("host"); + + // ----- Job Manager ---- + + /** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */ + public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT = + concat(SCOPE_ACTOR_HOST, "jobmanager"); + + /** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */ + public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT; + + // ----- Task Manager ---- + + public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id"); + + /** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */ + public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT = + concat(SCOPE_ACTOR_HOST, "taskmanager", SCOPE_TASKMANAGER_ID); + + /** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */ + public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT; + + // ----- Job ----- + + public static final String SCOPE_JOB_ID = asVariable("job_id"); + public static final String SCOPE_JOB_NAME = asVariable("job_name"); + + /** The default scope format for the job component: {@code "<job_name>"} */ + public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME; + + // ----- Job on Job Manager ---- + + /** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */ + public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP = + concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); + + // ----- Job on Task Manager ---- + + /** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */ + public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP = + concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); + + // ----- Task ---- + + public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id"); + public static final String SCOPE_TASK_NAME = asVariable("task_name"); + public static final String SCOPE_TASK_ATTEMPT_ID = asVariable("task_attempt_id"); + public static final String SCOPE_TASK_ATTEMPT_NUM = asVariable("task_attempt_num"); + public static final String SCOPE_TASK_SUBTASK_INDEX = asVariable("subtask_index"); + + /** Default scope of the task component: {@code "<task_name>.<subtask_index>"} */ + public static final String DEFAULT_SCOPE_TASK_COMPONENT = + concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX); + + /** The default scope format for all task metrics: + * {@code "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */ + public static final String DEFAULT_SCOPE_TASK_GROUP = + concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_TASK_COMPONENT); + + // ----- Operator ---- + + public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name"); + + /** The default scope added by the operator component: "<operator_name>.<subtask_index>" */ + public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT = + concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX); + + /** The default scope format for all operator metrics: + * {@code "<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */ + public static final String DEFAULT_SCOPE_OPERATOR_GROUP = + concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, DEFAULT_SCOPE_OPERATOR_COMPONENT); + + + // ------------------------------------------------------------------------ + // Scope Format Base + // ------------------------------------------------------------------------ + + /** The scope format */ + private final String format; + + /** The format, split into components */ + private final String[] template; + + private final int[] templatePos; + + private final int[] valuePos; + + // ------------------------------------------------------------------------ + + protected ScopeFormat(String format, ScopeFormat parent, String[] variables) { + checkNotNull(format, "format is null"); + + final String[] rawComponents = format.split("\\" + SCOPE_SEPARATOR); + + // compute the template array + final boolean parentAsPrefix = rawComponents.length > 0 && rawComponents[0].equals(SCOPE_INHERIT_PARENT); + if (parentAsPrefix) { + if (parent == null) { + throw new IllegalArgumentException("Component scope format requires parent prefix (starts with '" + + SCOPE_INHERIT_PARENT + "'), but this component has no parent (is root component)."); + } + + this.format = format.length() > 2 ? format.substring(2) : "<empty>"; + + String[] parentTemplate = parent.template; + int parentLen = parentTemplate.length; + + this.template = new String[parentLen + rawComponents.length - 1]; + System.arraycopy(parentTemplate, 0, this.template, 0, parentLen); + System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1); + } + else { + this.format = format.isEmpty() ? "<empty>" : format; + this.template = rawComponents; + } + + // --- compute the replacement matrix --- + // a bit of clumsy Java collections code ;-) + + HashMap<String, Integer> varToValuePos = arrayToMap(variables); + List<Integer> templatePos = new ArrayList<>(); + List<Integer> valuePos = new ArrayList<>(); + + for (int i = 0; i < template.length; i++) { + final String component = template[i]; + + // check if that is a variable + if (component != null && component.length() >= 3 && + component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') { + + // this is a variable + Integer replacementPos = varToValuePos.get(component); + if (replacementPos != null) { + templatePos.add(i); + valuePos.add(replacementPos); + } + } + } + + this.templatePos = integerListToArray(templatePos); + this.valuePos = integerListToArray(valuePos); + } + + // ------------------------------------------------------------------------ + + public String format() { + return format; + } + + protected final String[] copyTemplate() { + String[] copy = new String[template.length]; + System.arraycopy(template, 0, copy, 0, template.length); + return copy; + } + + protected final String[] bindVariables(String[] template, String[] values) { + final int len = templatePos.length; + for (int i = 0; i < len; i++) { + template[templatePos[i]] = values[valuePos[i]]; + } + return template; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "ScopeFormat '" + format + '\''; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Formats the given string to resemble a scope variable. + * + * @param scope The string to format + * @return The formatted string + */ + public static String asVariable(String scope) { + return SCOPE_VARIABLE_PREFIX + scope + SCOPE_VARIABLE_SUFFIX; + } + + public static String concat(String... components) { + return concat(defaultFilter, '.', components); + } + + public static String concat(CharacterFilter filter, String... components) { + return concat(filter, '.', components); + } + + public static String concat(Character delimiter, String... components) { + return concat(defaultFilter, delimiter, components); + } + + /** + * Concatenates the given component names separated by the delimiter character. Additionally + * the character filter is applied to all component names. + * + * @param filter Character filter to be applied to the component names + * @param delimiter Delimiter to separate component names + * @param components Array of component names + * @return The concatenated component name + */ + public static String concat(CharacterFilter filter, Character delimiter, String... components) { + StringBuilder sb = new StringBuilder(); + sb.append(filter.filterCharacters(components[0])); + for (int x = 1; x < components.length; x++) { + sb.append(delimiter); + sb.append(filter.filterCharacters(components[x])); + } + return sb.toString(); + } + + protected static String valueOrNull(Object value) { + return (value == null || (value instanceof String && ((String) value).isEmpty())) ? + "null" : value.toString(); + } + + protected static HashMap<String, Integer> arrayToMap(String[] array) { + HashMap<String, Integer> map = new HashMap<>(array.length); + for (int i = 0; i < array.length; i++) { + map.put(array[i], i); + } + return map; + } + + private static int[] integerListToArray(List<Integer> list) { + int[] array = new int[list.size()]; + int pos = 0; + for (Integer i : list) { + array[pos++] = i; + } + return array; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java new file mode 100644 index 0000000..bbbe6ba --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A container for component scope formats. + */ +public class ScopeFormats { + + private final JobManagerScopeFormat jobManagerFormat; + private final JobManagerJobScopeFormat jobManagerJobFormat; + private final TaskManagerScopeFormat taskManagerFormat; + private final TaskManagerJobScopeFormat taskManagerJobFormat; + private final TaskScopeFormat taskFormat; + private final OperatorScopeFormat operatorFormat; + + // ------------------------------------------------------------------------ + + /** + * Creates all default scope formats. + */ + public ScopeFormats() { + this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT); + + this.jobManagerJobFormat = new JobManagerJobScopeFormat( + ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat); + + this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT); + + this.taskManagerJobFormat = new TaskManagerJobScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat); + + this.taskFormat = new TaskScopeFormat( + ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, this.taskManagerJobFormat); + + this.operatorFormat = new OperatorScopeFormat( + ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, this.taskFormat); + } + + /** + * Creates all scope formats, based on the given scope format strings. + */ + public ScopeFormats( + String jobManagerFormat, + String jobManagerJobFormat, + String taskManagerFormat, + String taskManagerJobFormat, + String taskFormat, + String operatorFormat) + { + this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat); + this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat); + this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat); + this.taskManagerJobFormat = new TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat); + this.taskFormat = new TaskScopeFormat(taskFormat, this.taskManagerJobFormat); + this.operatorFormat = new OperatorScopeFormat(operatorFormat, this.taskFormat); + } + + /** + * Creates a {@code ScopeFormats} with the given scope formats. + */ + public ScopeFormats( + JobManagerScopeFormat jobManagerFormat, + JobManagerJobScopeFormat jobManagerJobFormat, + TaskManagerScopeFormat taskManagerFormat, + TaskManagerJobScopeFormat taskManagerJobFormat, + TaskScopeFormat taskFormat, + OperatorScopeFormat operatorFormat) + { + this.jobManagerFormat = checkNotNull(jobManagerFormat); + this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat); + this.taskManagerFormat = checkNotNull(taskManagerFormat); + this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat); + this.taskFormat = checkNotNull(taskFormat); + this.operatorFormat = checkNotNull(operatorFormat); + } + + // ------------------------------------------------------------------------ + // Accessors + // ------------------------------------------------------------------------ + + public JobManagerScopeFormat getJobManagerFormat() { + return this.jobManagerFormat; + } + + public TaskManagerScopeFormat getTaskManagerFormat() { + return this.taskManagerFormat; + } + + public TaskManagerJobScopeFormat getTaskManagerJobFormat() { + return this.taskManagerJobFormat; + } + + public JobManagerJobScopeFormat getJobManagerJobFormat() { + return this.jobManagerJobFormat; + } + + public TaskScopeFormat getTaskFormat() { + return this.taskFormat; + } + + public OperatorScopeFormat getOperatorFormat() { + return this.operatorFormat; + } + + // ------------------------------------------------------------------------ + // Parsing from Config + // ------------------------------------------------------------------------ + + /** + * Creates the scope formats as defined in the given configuration + * + * @param config The configuration that defines the formats + * @return The ScopeFormats parsed from the configuration + */ + public static ScopeFormats fromConfig(Configuration config) { + String jmFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP); + String jmJobFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP); + String tmFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); + String tmJobFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); + String taskFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); + String operatorFormat = config.getString( + ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); + + return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java new file mode 100644 index 0000000..558e710 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerJobScopeFormat.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; + +/** + * The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobMetricGroup}. + */ +public class TaskManagerJobScopeFormat extends ScopeFormat { + + public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_ACTOR_HOST, + SCOPE_TASKMANAGER_ID, + SCOPE_JOB_ID, + SCOPE_JOB_NAME + }); + } + + public String[] formatScope(TaskManagerMetricGroup parent, JobID jid, String jobName) { + final String[] template = copyTemplate(); + final String[] values = { + parent.hostname(), + parent.taskManagerId(), + valueOrNull(jid), + valueOrNull(jobName) + }; + return bindVariables(template, values); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java new file mode 100644 index 0000000..6e7c39d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskManagerScopeFormat.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +/** + * The scope format for the {@link org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup}. + */ +public class TaskManagerScopeFormat extends ScopeFormat { + + public TaskManagerScopeFormat(String format) { + super(format, null, new String[] { + SCOPE_ACTOR_HOST, + SCOPE_TASKMANAGER_ID + }); + } + + public String[] formatScope(String hostname, String taskManagerId) { + final String[] template = copyTemplate(); + final String[] values = { hostname, taskManagerId }; + return bindVariables(template, values); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java new file mode 100644 index 0000000..a781bf1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/TaskScopeFormat.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.scope; + +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.util.AbstractID; + +/** + * The scope format for the {@link org.apache.flink.runtime.metrics.groups.TaskMetricGroup}. + */ +public class TaskScopeFormat extends ScopeFormat { + + public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_ACTOR_HOST, + SCOPE_TASKMANAGER_ID, + SCOPE_JOB_ID, + SCOPE_JOB_NAME, + SCOPE_TASK_VERTEX_ID, + SCOPE_TASK_ATTEMPT_ID, + SCOPE_TASK_NAME, + SCOPE_TASK_SUBTASK_INDEX, + SCOPE_TASK_ATTEMPT_NUM + }); + } + + public String[] formatScope( + TaskManagerJobMetricGroup parent, + AbstractID vertexId, AbstractID attemptId, + String taskName, int subtask, int attemptNumber) { + + final String[] template = copyTemplate(); + final String[] values = { + parent.parent().hostname(), + parent.parent().taskManagerId(), + valueOrNull(parent.jobId()), + valueOrNull(parent.jobName()), + valueOrNull(vertexId), + valueOrNull(attemptId), + valueOrNull(taskName), + String.valueOf(subtask), + String.valueOf(attemptNumber) + }; + return bindVariables(template, values); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 3b72730..6fdf6f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 25e4b43..dbc0b62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index ef52381..0026bef 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -35,8 +35,8 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner -import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => FlinkMetricRegistry} -import org.apache.flink.metrics.groups.{JobManagerMetricGroup, UnregisteredMetricsGroup} +import org.apache.flink.metrics.{Gauge, MetricGroup} +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer @@ -71,6 +71,8 @@ import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, Abstract import org.apache.flink.runtime.messages.webmonitor.InfoMessage import org.apache.flink.runtime.messages.webmonitor._ +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner @@ -1114,7 +1116,7 @@ class JobManager( val jobMetrics = jobManagerMetricGroup match { case Some(group) => - group.addJob(jobGraph.getJobID, jobGraph.getName) match { + group.addJob(jobGraph) match { case (jobGroup:Any) => jobGroup case null => new UnregisteredMetricsGroup() } http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 0ff929c..dcf1e38 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -38,8 +38,7 @@ import grizzled.slf4j.Logger import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} -import org.apache.flink.metrics.groups.TaskManagerMetricGroup -import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge, MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -65,6 +64,8 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStack import org.apache.flink.runtime.messages.TaskManagerMessages._ import org.apache.flink.runtime.messages.TaskMessages._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner @@ -1092,11 +1093,7 @@ class TaskManager( jobName = tdd.getJobName } - val taskMetricGroup = taskManagerMetricGroup - .addTaskForJob( - tdd.getJobID, jobName, - tdd.getVertexID, tdd.getExecutionId, tdd.getTaskName, - tdd.getIndexInSubtaskGroup, tdd.getAttemptNumber) + val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd) val task = new Task( tdd, http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index bebcf7b..16331ac 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -21,7 +21,6 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef import org.apache.flink.configuration.Configuration -import org.apache.flink.metrics.MetricRegistry import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager @@ -30,6 +29,7 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.metrics.MetricRegistry import scala.concurrent.duration._ import scala.language.postfixOps http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 219e440..630335b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -23,10 +23,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.AbstractMetricGroup; -import org.apache.flink.metrics.groups.JobManagerMetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; @@ -44,6 +42,8 @@ import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -274,18 +274,18 @@ public class ExecutionGraphMetricsTest extends TestLogger { private final Map<String, Metric> metrics = new HashMap<>(); @Override - public void open(Configuration config) {} + public void open(MetricConfig config) {} @Override public void close() {} @Override - public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { metrics.put(metricName, metric); } @Override - public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { metrics.remove(metricName); } http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java deleted file mode 100644 index 1e6f019..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.jobmanager; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; -import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import javax.management.MBeanServer; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.util.Collections; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; - -public class JobManagerMetricTest { - /** - * Tests that metrics registered on the JobManager are actually accessible. - * - * @throws Exception - */ - @Test - public void testJobManagerMetricAccess() throws Exception { - Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); - Configuration flinkConfiguration = new Configuration(); - - flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_CLASS, "org.apache.flink.metrics.reporter.JMXReporter"); - flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>"); - flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--port 9060-9075"); - - TestingCluster flink = new TestingCluster(flinkConfiguration); - - try { - flink.start(); - - JobVertex sourceJobVertex = new JobVertex("Source"); - sourceJobVertex.setInvokableClass(BlockingInvokable.class); - - JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); - jobGraph.setSnapshotSettings(new JobSnapshottingSettings( - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - Collections.<JobVertexID>emptyList(), - 500, 500, 50, 5)); - - flink.waitForActorsToBeAlive(); - - flink.submitJobDetached(jobGraph); - - Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft()); - Await.ready(jobRunning, deadline.timeLeft()); - - MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName1 = new ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize"); - assertEquals(-1L, mBeanServer.getAttribute(objectName1, "Value")); - - Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft()); - - BlockingInvokable.unblock(); - - // wait til the job has finished - Await.ready(jobFinished, deadline.timeLeft()); - } finally { - flink.stop(); - } - } - - public static class BlockingInvokable extends AbstractInvokable { - private static boolean blocking = true; - private static final Object lock = new Object(); - - @Override - public void invoke() throws Exception { - while (blocking) { - synchronized (lock) { - lock.wait(); - } - } - } - - public static void unblock() { - blocking = false; - - synchronized (lock) { - lock.notifyAll(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java new file mode 100644 index 0000000..3252e3d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.apache.flink.runtime.metrics.util.TestReporter; + +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MetricRegistryTest extends TestLogger { + + /** + * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. + */ + @Test + public void testReporterInstantiation() { + Configuration config = new Configuration(); + + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName()); + + new MetricRegistry(config); + + Assert.assertTrue(TestReporter1.wasOpened); + } + + protected static class TestReporter1 extends TestReporter { + public static boolean wasOpened = false; + + @Override + public void open(MetricConfig config) { + wasOpened = true; + } + } + + /** + * Verifies that configured arguments are properly forwarded to the reporter. + */ + @Test + public void testReporterArgumentForwarding() { + Configuration config = new Configuration(); + + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter2.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_ARGUMENTS, "--arg1 hello --arg2 world"); + + new MetricRegistry(config); + } + + protected static class TestReporter2 extends TestReporter { + @Override + public void open(MetricConfig config) { + Assert.assertEquals("hello", config.getString("arg1", null)); + Assert.assertEquals("world", config.getString("arg2", null)); + } + } + + /** + * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics. + * + * @throws InterruptedException + */ + @Test + public void testReporterScheduling() throws InterruptedException { + Configuration config = new Configuration(); + + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter3.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_INTERVAL, "50 MILLISECONDS"); + + new MetricRegistry(config); + + long start = System.currentTimeMillis(); + for (int x = 0; x < 10; x++) { + Thread.sleep(100); + int reportCount = TestReporter3.reportCount; + long curT = System.currentTimeMillis(); + /** + * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports. + * This value however does not not take the first triggered report into account (=> +1). + * Furthermore we have to account for the mis-alignment between reports being triggered and our time + * measurement (=> +1); for T=200 a total of 4-6 reports may have been + * triggered depending on whether the end of the interval for the first reports ends before + * or after T=50. + */ + long maxAllowedReports = (curT - start) / 50 + 2; + Assert.assertTrue("Too many report were triggered.", maxAllowedReports >= reportCount); + } + Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); + } + + protected static class TestReporter3 extends TestReporter implements Scheduled { + public static int reportCount = 0; + + @Override + public void report() { + reportCount++; + } + } + + /** + * Verifies that reporters implementing the Listener interface are notified when Metrics are added or removed. + */ + @Test + public void testListener() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter6.class.getName()); + + MetricRegistry registry = new MetricRegistry(config); + + TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); + root.counter("rootCounter"); + root.close(); + + assertTrue(TestReporter6.addCalled); + assertTrue(TestReporter6.removeCalled); + } + + protected static class TestReporter6 extends TestReporter { + public static boolean addCalled = false; + public static boolean removeCalled = false; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + addCalled = true; + assertTrue(metric instanceof Counter); + assertEquals("rootCounter", metricName); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + removeCalled = true; + Assert.assertTrue(metric instanceof Counter); + Assert.assertEquals("rootCounter", metricName); + } + } + + /** + * Verifies that the scope configuration is properly extracted. + */ + @Test + public void testScopeConfig() { + Configuration config = new Configuration(); + + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A"); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "B"); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "C"); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, "D"); + + ScopeFormats scopeConfig = MetricRegistry.createScopeConfig(config); + + assertEquals("A", scopeConfig.getTaskManagerFormat().format()); + assertEquals("B", scopeConfig.getTaskManagerJobFormat().format()); + assertEquals("C", scopeConfig.getTaskFormat().format()); + assertEquals("D", scopeConfig.getOperatorFormat().format()); + } + + @Test + public void testConfigurableDelimiter() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_"); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E"); + + MetricRegistry registry = new MetricRegistry(config); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); + assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); + + registry.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java new file mode 100644 index 0000000..ddb5dfc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class JobManagerGroupTest { + + // ------------------------------------------------------------------------ + // adding and removing jobs + // ------------------------------------------------------------------------ + + @Test + public void addAndRemoveJobs() { + final JobManagerMetricGroup group = new JobManagerMetricGroup( + new MetricRegistry(new Configuration()), "localhost"); + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + JobManagerJobMetricGroup jmJobGroup11 = group.addJob(new JobGraph(jid1, jobName1)); + JobManagerJobMetricGroup jmJobGroup12 = group.addJob(new JobGraph(jid1, jobName1)); + JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new JobGraph(jid2, jobName2)); + + assertEquals(jmJobGroup11, jmJobGroup12); + + assertEquals(2, group.numRegisteredJobMetricGroups()); + + group.removeJob(jid1); + + assertTrue(jmJobGroup11.isClosed()); + assertEquals(1, group.numRegisteredJobMetricGroups()); + + group.removeJob(jid2); + + assertTrue(jmJobGroup21.isClosed()); + assertEquals(0, group.numRegisteredJobMetricGroups()); + } + + @Test + public void testCloseClosesAll() { + final JobManagerMetricGroup group = new JobManagerMetricGroup( + new MetricRegistry(new Configuration()), "localhost"); + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + JobManagerJobMetricGroup jmJobGroup11 = group.addJob(new JobGraph(jid1, jobName1)); + JobManagerJobMetricGroup jmJobGroup21 = group.addJob(new JobGraph(jid2, jobName2)); + + group.close(); + + assertTrue(jmJobGroup11.isClosed()); + assertTrue(jmJobGroup21.isClosed()); + } + + // ------------------------------------------------------------------------ + // scope name tests + // ------------------------------------------------------------------------ + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); + + assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); + assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name")); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobManagerScopeFormat format = new JobManagerScopeFormat("constant.<host>.foo.<host>"); + JobManagerMetricGroup group = new JobManagerMetricGroup(registry, format, "host"); + + assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); + assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java new file mode 100644 index 0000000..a13916b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.scope.JobManagerJobScopeFormat; +import org.apache.flink.runtime.metrics.scope.JobManagerScopeFormat; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class JobManagerJobGroupTest { + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + + assertArrayEquals( + new String[] { "theHostName", "jobmanager", "myJobName"}, + jmGroup.getScopeComponents()); + + assertEquals( + "theHostName.jobmanager.myJobName.name", + jmGroup.getMetricIdentifier("name")); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("abc"); + JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat); + + JobID jid = new JobID(); + + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "some-constant", "myJobName" }, + jmGroup.getScopeComponents()); + + assertEquals( + "some-constant.myJobName.name", + jmGroup.getMetricIdentifier("name")); + } + + @Test + public void testGenerateScopeCustomWildcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("peter"); + JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); + + JobID jid = new JobID(); + + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, tmFormat, "theHostName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "peter", "some-constant", jid.toString() }, + jmGroup.getScopeComponents()); + + assertEquals( + "peter.some-constant." + jid + ".name", + jmGroup.getMetricIdentifier("name")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java new file mode 100644 index 0000000..39485dc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.util.TestReporter; + +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class MetricGroupRegistrationTest { + /** + * Verifies that group methods instantiate the correct metric with the given name. + */ + @Test + public void testMetricInstantiation() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestReporter1.class.getName()); + + MetricRegistry registry = new MetricRegistry(config); + + MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); + + Counter counter = root.counter("counter"); + assertEquals(counter, TestReporter1.lastPassedMetric); + assertEquals("counter", TestReporter1.lastPassedName); + + Gauge<Object> gauge = root.gauge("gauge", new Gauge<Object>() { + @Override + public Object getValue() { + return null; + } + }); + + Assert.assertEquals(gauge, TestReporter1.lastPassedMetric); + assertEquals("gauge", TestReporter1.lastPassedName); + + Histogram histogram = root.histogram("histogram", new Histogram() { + @Override + public void update(long value) { + + } + + @Override + public long getCount() { + return 0; + } + + @Override + public HistogramStatistics getStatistics() { + return null; + } + }); + + Assert.assertEquals(histogram, TestReporter1.lastPassedMetric); + assertEquals("histogram", TestReporter1.lastPassedName); + registry.shutdown(); + } + + public static class TestReporter1 extends TestReporter { + + public static Metric lastPassedMetric; + public static String lastPassedName; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + lastPassedMetric = metric; + lastPassedName = metricName; + } + } + + /** + * Verifies that when attempting to create a group with the name of an existing one the existing one will be returned instead. + */ + @Test + public void testDuplicateGroupName() { + Configuration config = new Configuration(); + + MetricRegistry registry = new MetricRegistry(config); + + MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id"); + + MetricGroup group1 = root.addGroup("group"); + MetricGroup group2 = root.addGroup("group"); + MetricGroup group3 = root.addGroup("group"); + Assert.assertTrue(group1 == group2 && group2 == group3); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java new file mode 100644 index 0000000..16fb518 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.MetricRegistry; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class MetricGroupTest { + + private MetricRegistry registry; + + private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry(); + + @Before + public void createRegistry() { + this.registry = new MetricRegistry(new Configuration()); + } + + @After + public void shutdownRegistry() { + this.registry.shutdown(); + this.registry = null; + } + + @Test + public void sameGroupOnNameCollision() { + GenericMetricGroup group = new GenericMetricGroup( + registry, new DummyAbstractMetricGroup(registry), "somegroup"); + + String groupName = "sometestname"; + MetricGroup subgroup1 = group.addGroup(groupName); + MetricGroup subgroup2 = group.addGroup(groupName); + + assertNotNull(subgroup1); + assertNotNull(subgroup2); + assertTrue(subgroup1 == subgroup2); + } + + @Test + public void closedGroupDoesNotRegisterMetrics() { + GenericMetricGroup group = new GenericMetricGroup( + exceptionOnRegister, new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup"); + assertFalse(group.isClosed()); + + group.close(); + assertTrue(group.isClosed()); + + // these will fail is the registration is propagated + group.counter("testcounter"); + group.gauge("testgauge", new Gauge<Object>() { + @Override + public Object getValue() { return null; } + }); + } + + @Test + public void closedGroupCreatesClosedGroups() { + GenericMetricGroup group = new GenericMetricGroup(exceptionOnRegister, + new DummyAbstractMetricGroup(exceptionOnRegister), "testgroup"); + assertFalse(group.isClosed()); + + group.close(); + assertTrue(group.isClosed()); + + AbstractMetricGroup subgroup = (AbstractMetricGroup) group.addGroup("test subgroup"); + assertTrue(subgroup.isClosed()); + } + + @Test + public void tolerateMetricNameCollisions() { + final String name = "abctestname"; + GenericMetricGroup group = new GenericMetricGroup( + registry, new DummyAbstractMetricGroup(registry), "testgroup"); + + assertNotNull(group.counter(name)); + assertNotNull(group.counter(name)); + } + + @Test + public void tolerateMetricAndGroupNameCollisions() { + final String name = "abctestname"; + GenericMetricGroup group = new GenericMetricGroup( + registry, new DummyAbstractMetricGroup(registry), "testgroup"); + + assertNotNull(group.addGroup(name)); + assertNotNull(group.counter(name)); + } + + // ------------------------------------------------------------------------ + + private static class ExceptionOnRegisterRegistry extends MetricRegistry { + + public ExceptionOnRegisterRegistry() { + super(new Configuration()); + } + + @Override + public void register(Metric metric, String name, MetricGroup parent) { + fail("Metric should never be registered"); + } + + @Override + public void unregister(Metric metric, String name, MetricGroup parent) { + fail("Metric should never be un-registered"); + } + } + + // ------------------------------------------------------------------------ + + private static class DummyAbstractMetricGroup extends AbstractMetricGroup { + + public DummyAbstractMetricGroup(MetricRegistry registry) { + super(registry, new String[0]); + } + + @Override + protected void addMetric(String name, Metric metric) {} + + @Override + public MetricGroup addGroup(String name) { + return new DummyAbstractMetricGroup(registry); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fec1f9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java new file mode 100644 index 0000000..4da713f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.util.AbstractID; + +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class OperatorGroupTest { + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskMetricGroup taskGroup = new TaskMetricGroup( + registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0); + OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName"); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "myOpName", "11" }, + opGroup.getScopeComponents()); + + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name", + opGroup.getMetricIdentifier("name")); + + registry.shutdown(); + } +}
