[FLINK-456] Basic JM Metric Infrastructure This closes #2146
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3a9fd11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3a9fd11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3a9fd11 Branch: refs/heads/master Commit: a3a9fd1147aa926987420057f8305ab498519a45 Parents: a11c1c6 Author: zentol <[email protected]> Authored: Fri Jul 1 14:12:44 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Jul 1 15:09:16 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/metrics/MetricRegistry.java | 14 ++- .../groups/JobManagerJobMetricGroup.java | 69 +++++++++++ .../metrics/groups/JobManagerMetricGroup.java | 104 ++++++++++++++++ .../flink/metrics/groups/JobMetricGroup.java | 99 ++------------- .../groups/TaskManagerJobMetricGroup.java | 122 +++++++++++++++++++ .../metrics/groups/TaskManagerMetricGroup.java | 12 +- .../flink/metrics/groups/TaskMetricGroup.java | 8 +- .../flink/metrics/groups/scope/ScopeFormat.java | 84 +++++++++++-- .../metrics/groups/scope/ScopeFormats.java | 27 +++- .../flink/metrics/MetricRegistryTest.java | 4 +- .../flink/metrics/groups/JobGroupTest.java | 94 -------------- .../metrics/groups/JobManagerGroupTest.java | 108 ++++++++++++++++ .../metrics/groups/JobManagerJobGroupTest.java | 90 ++++++++++++++ .../flink/metrics/groups/OperatorGroupTest.java | 2 +- .../flink/metrics/groups/TaskGroupTest.java | 6 +- .../metrics/groups/TaskManagerJobGroupTest.java | 94 ++++++++++++++ .../flink/runtime/jobmanager/JobManager.scala | 41 ++++++- .../testutils/UnregisteredTaskMetricsGroup.java | 4 +- .../runtime/testingUtils/TestingCluster.scala | 6 +- .../testingUtils/TestingJobManager.scala | 7 +- .../runtime/testingUtils/TestingUtils.scala | 6 +- .../org/apache/flink/yarn/YarnJobManager.scala | 7 +- 22 files changed, 778 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index f283ce3..09beef6 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -52,8 +52,10 @@ public class MetricRegistry { public static final String KEY_METRICS_REPORTER_ARGUMENTS = "metrics.reporter.arguments"; public static final String KEY_METRICS_REPORTER_INTERVAL = "metrics.reporter.interval"; + public static final String KEY_METRICS_SCOPE_NAMING_JM = "metrics.scope.jm"; public static final String KEY_METRICS_SCOPE_NAMING_TM = "metrics.scope.tm"; - public static final String KEY_METRICS_SCOPE_NAMING_JOB = "metrics.scope.job"; + public static final String KEY_METRICS_SCOPE_NAMING_JM_JOB = "metrics.scope.jm.job"; + public static final String KEY_METRICS_SCOPE_NAMING_TM_JOB = "metrics.scope.tm.job"; public static final String KEY_METRICS_SCOPE_NAMING_TASK = "metrics.scope.task"; public static final String KEY_METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator"; @@ -243,16 +245,20 @@ public class MetricRegistry { } static ScopeFormats createScopeConfig(Configuration config) { + String jmFormat = config.getString( + KEY_METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP); + String jmJobFormat = config.getString( + KEY_METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP); String tmFormat = config.getString( KEY_METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP); - String jobFormat = config.getString( - KEY_METRICS_SCOPE_NAMING_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); + String tmJobFormat = config.getString( + KEY_METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP); String taskFormat = config.getString( KEY_METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP); String operatorFormat = config.getString( KEY_METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP); - return new ScopeFormats(tmFormat, jobFormat, taskFormat, operatorFormat); + return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java new file mode 100644 index 0000000..1dd0439 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerJobMetricGroup.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat; + +import javax.annotation.Nullable; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to + * a specific job, running on the JobManager. + */ +@Internal +public class JobManagerJobMetricGroup extends JobMetricGroup { + + /** The metrics group that contains this group */ + private final JobManagerMetricGroup parent; + + public JobManagerJobMetricGroup( + MetricRegistry registry, + JobManagerMetricGroup parent, + JobID jobId, + @Nullable String jobName) { + + this(registry, checkNotNull(parent), registry.getScopeFormats().getJobManagerJobFormat(), jobId, jobName); + } + + public JobManagerJobMetricGroup( + MetricRegistry registry, + JobManagerMetricGroup parent, + JobManagerJobScopeFormat scopeFormat, + JobID jobId, + @Nullable String jobName) { + + super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); + + this.parent = checkNotNull(parent); + } + + public final JobManagerMetricGroup parent() { + return parent; + } + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java new file mode 100644 index 0000000..67e1117 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobManagerMetricGroup.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; + +import java.util.HashMap; +import java.util.Map; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing a JobManager. + * + * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do + * not contain tasks any more + */ +@Internal +public class JobManagerMetricGroup extends ComponentMetricGroup { + + private final Map<JobID, JobManagerJobMetricGroup> jobs = new HashMap<>(); + + private final String hostname; + + public JobManagerMetricGroup(MetricRegistry registry, String hostname) { + this(registry, registry.getScopeFormats().getJobManagerFormat(), hostname); + } + + public JobManagerMetricGroup( + MetricRegistry registry, + JobManagerScopeFormat scopeFormat, + String hostname) { + + super(registry, scopeFormat.formatScope(hostname)); + this.hostname = hostname; + } + + public String hostname() { + return hostname; + } + + // ------------------------------------------------------------------------ + // job groups + // ------------------------------------------------------------------------ + + public JobManagerJobMetricGroup addJob( + JobID jobId, + String jobName) { + // get or create a jobs metric group + JobManagerJobMetricGroup currentJobGroup; + synchronized (this) { + if (!isClosed()) { + currentJobGroup = jobs.get(jobId); + + if (currentJobGroup == null || currentJobGroup.isClosed()) { + currentJobGroup = new JobManagerJobMetricGroup(registry, this, jobId, jobName); + jobs.put(jobId, currentJobGroup); + } + return currentJobGroup; + } else { + return null; + } + } + } + + public void removeJob(JobID jobId) { + if (jobId == null) { + return; + } + + synchronized (this) { + JobManagerJobMetricGroup containedGroup = jobs.remove(jobId); + if (containedGroup != null) { + containedGroup.close(); + } + } + } + + public int numRegisteredJobMetricGroups() { + return jobs.size(); + } + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return jobs.values(); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java index f816278..f7dfc78 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/JobMetricGroup.java @@ -21,66 +21,36 @@ package org.apache.flink.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; -import org.apache.flink.util.AbstractID; import javax.annotation.Nullable; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to - * a specific job, running on the TaskManager. - * - * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}). + * Special abstract {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to + * a specific job. */ @Internal -public class JobMetricGroup extends ComponentMetricGroup { - - /** The metrics group that contains this group */ - private final TaskManagerMetricGroup parent; - - /** Map from execution attempt ID (task identifier) to task metrics */ - private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>(); +public abstract class JobMetricGroup extends ComponentMetricGroup { /** The ID of the job represented by this metrics group */ - private final JobID jobId; + protected final JobID jobId; /** The name of the job represented by this metrics group */ @Nullable - private final String jobName; + protected final String jobName; // ------------------------------------------------------------------------ - public JobMetricGroup( + protected JobMetricGroup( MetricRegistry registry, - TaskManagerMetricGroup parent, JobID jobId, - @Nullable String jobName) { + @Nullable String jobName, + String[] scope) { + super(registry, scope); - this(registry, checkNotNull(parent), registry.getScopeFormats().getJobFormat(), jobId, jobName); - } - - public JobMetricGroup( - MetricRegistry registry, - TaskManagerMetricGroup parent, - TaskManagerJobScopeFormat scopeFormat, - JobID jobId, - @Nullable String jobName) { - - super(registry, scopeFormat.formatScope(parent, jobId, jobName)); - - this.parent = checkNotNull(parent); - this.jobId = checkNotNull(jobId); + this.jobId = jobId; this.jobName = jobName; } - public final TaskManagerMetricGroup parent() { - return parent; - } - public JobID jobId() { return jobId; } @@ -89,53 +59,4 @@ public class JobMetricGroup extends ComponentMetricGroup { public String jobName() { return jobName; } - - // ------------------------------------------------------------------------ - // adding / removing tasks - // ------------------------------------------------------------------------ - - public TaskMetricGroup addTask( - AbstractID vertexId, - AbstractID executionId, - String taskName, - int subtaskIndex, - int attemptNumber) { - - checkNotNull(executionId); - - synchronized (this) { - if (!isClosed()) { - TaskMetricGroup task = new TaskMetricGroup(registry, this, - vertexId, executionId, taskName, subtaskIndex, attemptNumber); - tasks.put(executionId, task); - return task; - } else { - return null; - } - } - } - - public void removeTaskMetricGroup(AbstractID executionId) { - checkNotNull(executionId); - - boolean removeFromParent = false; - synchronized (this) { - if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) { - // this call removed the last task. close this group. - removeFromParent = true; - close(); - } - } - - // IMPORTANT: removing from the parent must happen while holding the this group's lock, - // because it would violate the "first parent then subgroup" lock acquisition order - if (removeFromParent) { - parent.removeJobMetricsGroup(jobId, this); - } - } - - @Override - protected Iterable<? extends ComponentMetricGroup> subComponents() { - return tasks.values(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java new file mode 100644 index 0000000..fdaf1de --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerJobMetricGroup.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; +import org.apache.flink.util.AbstractID; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to + * a specific job, running on the TaskManager. + * + * <p>Contains extra logic for adding Tasks ({@link TaskMetricGroup}). + */ +@Internal +public class TaskManagerJobMetricGroup extends JobMetricGroup { + + /** The metrics group that contains this group */ + private final TaskManagerMetricGroup parent; + + /** Map from execution attempt ID (task identifier) to task metrics */ + private final Map<AbstractID, TaskMetricGroup> tasks = new HashMap<>(); + + // ------------------------------------------------------------------------ + + public TaskManagerJobMetricGroup( + MetricRegistry registry, + TaskManagerMetricGroup parent, + JobID jobId, + @Nullable String jobName) { + + this(registry, checkNotNull(parent), registry.getScopeFormats().getTaskManagerJobFormat(), jobId, jobName); + } + + public TaskManagerJobMetricGroup( + MetricRegistry registry, + TaskManagerMetricGroup parent, + TaskManagerJobScopeFormat scopeFormat, + JobID jobId, + @Nullable String jobName) { + + super(registry, jobId, jobName, scopeFormat.formatScope(parent, jobId, jobName)); + + this.parent = checkNotNull(parent); + } + + public final TaskManagerMetricGroup parent() { + return parent; + } + + // ------------------------------------------------------------------------ + // adding / removing tasks + // ------------------------------------------------------------------------ + + public TaskMetricGroup addTask( + AbstractID vertexId, + AbstractID executionId, + String taskName, + int subtaskIndex, + int attemptNumber) { + + checkNotNull(executionId); + + synchronized (this) { + if (!isClosed()) { + TaskMetricGroup task = new TaskMetricGroup(registry, this, + vertexId, executionId, taskName, subtaskIndex, attemptNumber); + tasks.put(executionId, task); + return task; + } else { + return null; + } + } + } + + public void removeTaskMetricGroup(AbstractID executionId) { + checkNotNull(executionId); + + boolean removeFromParent = false; + synchronized (this) { + if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) { + // this call removed the last task. close this group. + removeFromParent = true; + close(); + } + } + + // IMPORTANT: removing from the parent must not happen while holding the this group's lock, + // because it would violate the "first parent then subgroup" lock acquisition order + if (removeFromParent) { + parent.removeJobMetricsGroup(jobId, this); + } + } + + @Override + protected Iterable<? extends ComponentMetricGroup> subComponents() { + return tasks.values(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java index 3cb3936..2b2b201 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskManagerMetricGroup.java @@ -36,7 +36,7 @@ import java.util.Map; @Internal public class TaskManagerMetricGroup extends ComponentMetricGroup { - private final Map<JobID, JobMetricGroup> jobs = new HashMap<>(); + private final Map<JobID, TaskManagerJobMetricGroup> jobs = new HashMap<>(); private final String hostname; @@ -82,12 +82,12 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup { // because it might lead to a deadlock while (true) { // get or create a jobs metric group - JobMetricGroup currentJobGroup; + TaskManagerJobMetricGroup currentJobGroup; synchronized (this) { currentJobGroup = jobs.get(jobId); - + if (currentJobGroup == null || currentJobGroup.isClosed()) { - currentJobGroup = new JobMetricGroup(registry, this, jobId, jobName); + currentJobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, jobName); jobs.put(jobId, currentJobGroup); } } @@ -106,14 +106,14 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup { } } - public void removeJobMetricsGroup(JobID jobId, JobMetricGroup group) { + public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup group) { if (jobId == null || group == null || !group.isClosed()) { return; } synchronized (this) { // optimistically remove the currently contained group, and check later if it was correct - JobMetricGroup containedGroup = jobs.remove(jobId); + TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId); // check if another group was actually contained, and restore that one if (containedGroup != null && containedGroup != group) { http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java index 784578b..c0428ac 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/TaskMetricGroup.java @@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class TaskMetricGroup extends ComponentMetricGroup { /** The job metrics group containing this task metrics group */ - private final JobMetricGroup parent; + private final TaskManagerJobMetricGroup parent; private final Map<String, OperatorMetricGroup> operators = new HashMap<>(); @@ -61,7 +61,7 @@ public class TaskMetricGroup extends ComponentMetricGroup { public TaskMetricGroup( MetricRegistry registry, - JobMetricGroup parent, + TaskManagerJobMetricGroup parent, @Nullable AbstractID vertexId, AbstractID executionId, @Nullable String taskName, @@ -74,7 +74,7 @@ public class TaskMetricGroup extends ComponentMetricGroup { public TaskMetricGroup( MetricRegistry registry, - JobMetricGroup parent, + TaskManagerJobMetricGroup parent, TaskScopeFormat scopeFormat, @Nullable AbstractID vertexId, AbstractID executionId, @@ -99,7 +99,7 @@ public class TaskMetricGroup extends ComponentMetricGroup { // properties // ------------------------------------------------------------------------ - public final JobMetricGroup parent() { + public final TaskManagerJobMetricGroup parent() { return parent; } http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java index 9637f65..b73cf51 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormat.java @@ -19,7 +19,9 @@ package org.apache.flink.metrics.groups.scope; import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.groups.JobManagerMetricGroup; import org.apache.flink.metrics.groups.JobMetricGroup; +import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.util.AbstractID; @@ -66,29 +68,47 @@ public abstract class ScopeFormat { // Scope Variables // ------------------------------------------------------------------------ + public static final String SCOPE_ACTOR_HOST = asVariable("host"); + + // ----- Job Manager ---- + + /** The default scope format of the JobManager component: {@code "<host>.jobmanager"} */ + public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT = + concat(SCOPE_ACTOR_HOST, "jobmanager"); + + /** The default scope format of JobManager metrics: {@code "<host>.jobmanager"} */ + public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = DEFAULT_SCOPE_JOBMANAGER_COMPONENT; + // ----- Task Manager ---- - public static final String SCOPE_TASKMANAGER_HOST = asVariable("host"); public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id"); /** The default scope format of the TaskManager component: {@code "<host>.taskmanager.<tm_id>"} */ public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT = - concat(SCOPE_TASKMANAGER_HOST, "taskmanager", SCOPE_TASKMANAGER_ID); + concat(SCOPE_ACTOR_HOST, "taskmanager", SCOPE_TASKMANAGER_ID); /** The default scope format of TaskManager metrics: {@code "<host>.taskmanager.<tm_id>"} */ public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = DEFAULT_SCOPE_TASKMANAGER_COMPONENT; - // ----- Job on Task Manager ---- + // ----- Job ----- public static final String SCOPE_JOB_ID = asVariable("job_id"); public static final String SCOPE_JOB_NAME = asVariable("job_name"); /** The default scope format for the job component: {@code "<job_name>"} */ - public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT = SCOPE_JOB_NAME; + public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME; + + // ----- Job on Job Manager ---- + + /** The default scope format for all job metrics on a jobmanager: {@code "<host>.jobmanager.<job_name>"} */ + public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP = + concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); + + // ----- Job on Task Manager ---- - /** The default scope format for all job metrics: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */ + /** The default scope format for all job metrics on a taskmanager: {@code "<host>.taskmanager.<tm_id>.<job_name>"} */ public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP = - concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_TASKMANAGER_JOB_COMPONENT); + concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, DEFAULT_SCOPE_JOB_COMPONENT); // ----- Task ---- @@ -125,13 +145,31 @@ public abstract class ScopeFormat { // ------------------------------------------------------------------------ /** + * The scope format for the {@link JobManagerMetricGroup}. + */ + public static class JobManagerScopeFormat extends ScopeFormat { + + public JobManagerScopeFormat(String format) { + super(format, null, new String[] { + SCOPE_ACTOR_HOST + }); + } + + public String[] formatScope(String hostname) { + final String[] template = copyTemplate(); + final String[] values = { hostname }; + return bindVariables(template, values); + } + } + + /** * The scope format for the {@link TaskManagerMetricGroup}. */ public static class TaskManagerScopeFormat extends ScopeFormat { public TaskManagerScopeFormat(String format) { super(format, null, new String[] { - SCOPE_TASKMANAGER_HOST, + SCOPE_ACTOR_HOST, SCOPE_TASKMANAGER_ID }); } @@ -148,11 +186,35 @@ public abstract class ScopeFormat { /** * The scope format for the {@link JobMetricGroup}. */ + public static class JobManagerJobScopeFormat extends ScopeFormat { + + public JobManagerJobScopeFormat(String format, JobManagerScopeFormat parentFormat) { + super(format, parentFormat, new String[] { + SCOPE_ACTOR_HOST, + SCOPE_JOB_ID, + SCOPE_JOB_NAME + }); + } + + public String[] formatScope(JobManagerMetricGroup parent, JobID jid, String jobName) { + final String[] template = copyTemplate(); + final String[] values = { + parent.hostname(), + valueOrNull(jid), + valueOrNull(jobName) + }; + return bindVariables(template, values); + } + } + + /** + * The scope format for the {@link JobMetricGroup}. + */ public static class TaskManagerJobScopeFormat extends ScopeFormat { public TaskManagerJobScopeFormat(String format, TaskManagerScopeFormat parentFormat) { super(format, parentFormat, new String[] { - SCOPE_TASKMANAGER_HOST, + SCOPE_ACTOR_HOST, SCOPE_TASKMANAGER_ID, SCOPE_JOB_ID, SCOPE_JOB_NAME @@ -180,7 +242,7 @@ public abstract class ScopeFormat { public TaskScopeFormat(String format, TaskManagerJobScopeFormat parentFormat) { super(format, parentFormat, new String[] { - SCOPE_TASKMANAGER_HOST, + SCOPE_ACTOR_HOST, SCOPE_TASKMANAGER_ID, SCOPE_JOB_ID, SCOPE_JOB_NAME, @@ -193,7 +255,7 @@ public abstract class ScopeFormat { } public String[] formatScope( - JobMetricGroup parent, + TaskManagerJobMetricGroup parent, AbstractID vertexId, AbstractID attemptId, String taskName, int subtask, int attemptNumber) { @@ -222,7 +284,7 @@ public abstract class ScopeFormat { public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) { super(format, parentFormat, new String[] { - SCOPE_TASKMANAGER_HOST, + SCOPE_ACTOR_HOST, SCOPE_TASKMANAGER_ID, SCOPE_JOB_ID, SCOPE_JOB_NAME, http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java index 1451637..978e761 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/groups/scope/ScopeFormats.java @@ -19,6 +19,8 @@ package org.apache.flink.metrics.groups.scope; import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; import org.apache.flink.metrics.groups.scope.ScopeFormat.OperatorScopeFormat; import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; @@ -32,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class ScopeFormats { + private final JobManagerScopeFormat jobManagerFormat; + private final JobManagerJobScopeFormat jobManagerJobFormat; private final TaskManagerScopeFormat taskManagerFormat; private final TaskManagerJobScopeFormat taskManagerJobFormat; private final TaskScopeFormat taskFormat; @@ -43,6 +47,11 @@ public class ScopeFormats { * Creates all default scope formats. */ public ScopeFormats() { + this.jobManagerFormat = new JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT); + + this.jobManagerJobFormat = new JobManagerJobScopeFormat( + ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, this.jobManagerFormat); + this.taskManagerFormat = new TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT); this.taskManagerJobFormat = new TaskManagerJobScopeFormat( @@ -59,11 +68,15 @@ public class ScopeFormats { * Creates all scope formats, based on the given scope format strings. */ public ScopeFormats( + String jobManagerFormat, + String jobManagerJobFormat, String taskManagerFormat, String taskManagerJobFormat, String taskFormat, String operatorFormat) { + this.jobManagerFormat = new JobManagerScopeFormat(jobManagerFormat); + this.jobManagerJobFormat = new JobManagerJobScopeFormat(jobManagerJobFormat, this.jobManagerFormat); this.taskManagerFormat = new TaskManagerScopeFormat(taskManagerFormat); this.taskManagerJobFormat = new TaskManagerJobScopeFormat(taskManagerJobFormat, this.taskManagerFormat); this.taskFormat = new TaskScopeFormat(taskFormat, this.taskManagerJobFormat); @@ -74,11 +87,15 @@ public class ScopeFormats { * Creates a {@code ScopeFormats} with the given scope formats. */ public ScopeFormats( + JobManagerScopeFormat jobManagerFormat, + JobManagerJobScopeFormat jobManagerJobFormat, TaskManagerScopeFormat taskManagerFormat, TaskManagerJobScopeFormat taskManagerJobFormat, TaskScopeFormat taskFormat, OperatorScopeFormat operatorFormat) { + this.jobManagerFormat = checkNotNull(jobManagerFormat); + this.jobManagerJobFormat = checkNotNull(jobManagerJobFormat); this.taskManagerFormat = checkNotNull(taskManagerFormat); this.taskManagerJobFormat = checkNotNull(taskManagerJobFormat); this.taskFormat = checkNotNull(taskFormat); @@ -87,14 +104,22 @@ public class ScopeFormats { // ------------------------------------------------------------------------ + public JobManagerScopeFormat getJobManagerFormat() { + return this.jobManagerFormat; + } + public TaskManagerScopeFormat getTaskManagerFormat() { return this.taskManagerFormat; } - public TaskManagerJobScopeFormat getJobFormat() { + public TaskManagerJobScopeFormat getTaskManagerJobFormat() { return this.taskManagerJobFormat; } + public JobManagerJobScopeFormat getJobManagerJobFormat() { + return this.jobManagerJobFormat; + } + public TaskScopeFormat getTaskFormat() { return this.taskFormat; } http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java index 8b71816..77acd3c 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/MetricRegistryTest.java @@ -164,14 +164,14 @@ public class MetricRegistryTest extends TestLogger { Configuration config = new Configuration(); config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM, "A"); - config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_JOB, "B"); + config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TM_JOB, "B"); config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_TASK, "C"); config.setString(MetricRegistry.KEY_METRICS_SCOPE_NAMING_OPERATOR, "D"); ScopeFormats scopeConfig = MetricRegistry.createScopeConfig(config); assertEquals("A", scopeConfig.getTaskManagerFormat().format()); - assertEquals("B", scopeConfig.getJobFormat().format()); + assertEquals("B", scopeConfig.getTaskManagerJobFormat().format()); assertEquals("C", scopeConfig.getTaskFormat().format()); assertEquals("D", scopeConfig.getOperatorFormat().format()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java deleted file mode 100644 index 4bcb1ee..0000000 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobGroupTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.metrics.groups; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricRegistry; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; -import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; - -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -public class JobGroupTest { - - @Test - public void testGenerateScopeDefault() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); - - assertArrayEquals( - new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"}, - jmGroup.getScopeComponents()); - - assertEquals( - "theHostName.taskmanager.test-tm-id.myJobName", - jmGroup.getScopeString()); - registry.shutdown(); - } - - @Test - public void testGenerateScopeCustom() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat); - - JobID jid = new JobID(); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); - - assertArrayEquals( - new String[] { "some-constant", "myJobName" }, - jmGroup.getScopeComponents()); - - assertEquals( - "some-constant.myJobName", - jmGroup.getScopeString()); - registry.shutdown(); - } - - @Test - public void testGenerateScopeCustomWildcard() { - MetricRegistry registry = new MetricRegistry(new Configuration()); - - TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>"); - TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); - - JobID jid = new JobID(); - - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); - - assertArrayEquals( - new String[] { "peter", "test-tm-id", "some-constant", jid.toString() }, - jmGroup.getScopeComponents()); - - assertEquals( - "peter.test-tm-id.some-constant." + jid, - jmGroup.getScopeString()); - registry.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java new file mode 100644 index 0000000..8853f20 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerGroupTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class JobManagerGroupTest { + + // ------------------------------------------------------------------------ + // adding and removing jobs + // ------------------------------------------------------------------------ + + @Test + public void addAndRemoveJobs() { + final JobManagerMetricGroup group = new JobManagerMetricGroup( + new MetricRegistry(new Configuration()), "localhost"); + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1); + JobManagerJobMetricGroup jmJobGroup12 = group.addJob(jid1, jobName1); + JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2); + + assertEquals(jmJobGroup11, jmJobGroup12); + + assertEquals(2, group.numRegisteredJobMetricGroups()); + + group.removeJob(jid1); + + assertTrue(jmJobGroup11.isClosed()); + assertEquals(1, group.numRegisteredJobMetricGroups()); + + group.removeJob(jid2); + + assertTrue(jmJobGroup21.isClosed()); + assertEquals(0, group.numRegisteredJobMetricGroups()); + } + + @Test + public void testCloseClosesAll() { + final JobManagerMetricGroup group = new JobManagerMetricGroup( + new MetricRegistry(new Configuration()), "localhost"); + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + final String jobName1 = "testjob"; + final String jobName2 = "anotherJob"; + + JobManagerJobMetricGroup jmJobGroup11 = group.addJob(jid1, jobName1); + JobManagerJobMetricGroup jmJobGroup21 = group.addJob(jid2, jobName2); + + group.close(); + + assertTrue(jmJobGroup11.isClosed()); + assertTrue(jmJobGroup21.isClosed()); + } + + // ------------------------------------------------------------------------ + // scope name tests + // ------------------------------------------------------------------------ + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); + + assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); + assertEquals("localhost.jobmanager", group.getScopeString()); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobManagerScopeFormat format = new JobManagerScopeFormat("constant.<host>.foo.<host>"); + JobManagerMetricGroup group = new JobManagerMetricGroup(registry, format, "host"); + + assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); + assertEquals("constant.host.foo.host", group.getScopeString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java new file mode 100644 index 0000000..3833cb8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/JobManagerJobGroupTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerJobScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.JobManagerScopeFormat; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class JobManagerJobGroupTest { + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + + assertArrayEquals( + new String[] { "theHostName", "jobmanager", "myJobName"}, + jmGroup.getScopeComponents()); + + assertEquals( + "theHostName.jobmanager.myJobName", + jmGroup.getScopeString()); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("abc"); + JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("some-constant.<job_name>", tmFormat); + + JobID jid = new JobID(); + + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "some-constant", "myJobName" }, + jmGroup.getScopeComponents()); + + assertEquals( + "some-constant.myJobName", + jmGroup.getScopeString()); + } + + @Test + public void testGenerateScopeCustomWildcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + JobManagerScopeFormat tmFormat = new JobManagerScopeFormat("peter"); + JobManagerJobScopeFormat jmFormat = new JobManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); + + JobID jid = new JobID(); + + JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, tmFormat, "theHostName"); + JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "peter", "some-constant", jid.toString() }, + jmGroup.getScopeComponents()); + + assertEquals( + "peter.some-constant." + jid, + jmGroup.getScopeString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java index c0c8842..9641632 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/OperatorGroupTest.java @@ -35,7 +35,7 @@ public class OperatorGroupTest { MetricRegistry registry = new MetricRegistry(new Configuration()); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0); OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName"); http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java index 88f425b..357852a 100644 --- a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskGroupTest.java @@ -60,7 +60,7 @@ public class TaskGroupTest { AbstractID executionId = new AbstractID(); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup(registry, jmGroup, vertexId, executionId, "aTaskName", 13, 2); assertArrayEquals( @@ -86,7 +86,7 @@ public class TaskGroupTest { AbstractID executionId = new AbstractID(); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, jid, "myJobName"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( registry, jmGroup, taskFormat, vertexId, executionId, "aTaskName", 13, 2); @@ -114,7 +114,7 @@ public class TaskGroupTest { AbstractID executionId = new AbstractID(); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); - JobMetricGroup jmGroup = new JobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( registry, jmGroup, format, new AbstractID(), executionId, "aTaskName", 13, 1); http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java new file mode 100644 index 0000000..5cec70b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/metrics/groups/TaskManagerJobGroupTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerJobScopeFormat; +import org.apache.flink.metrics.groups.scope.ScopeFormat.TaskManagerScopeFormat; + +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class TaskManagerJobGroupTest { + + @Test + public void testGenerateScopeDefault() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); + + assertArrayEquals( + new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName"}, + jmGroup.getScopeComponents()); + + assertEquals( + "theHostName.taskmanager.test-tm-id.myJobName", + jmGroup.getScopeString()); + registry.shutdown(); + } + + @Test + public void testGenerateScopeCustom() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("abc"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("some-constant.<job_name>", tmFormat); + + JobID jid = new JobID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "some-constant", "myJobName" }, + jmGroup.getScopeComponents()); + + assertEquals( + "some-constant.myJobName", + jmGroup.getScopeString()); + registry.shutdown(); + } + + @Test + public void testGenerateScopeCustomWildcard() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + + TaskManagerScopeFormat tmFormat = new TaskManagerScopeFormat("peter.<tm_id>"); + TaskManagerJobScopeFormat jmFormat = new TaskManagerJobScopeFormat("*.some-constant.<job_id>", tmFormat); + + JobID jid = new JobID(); + + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, tmFormat, "theHostName", "test-tm-id"); + JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jmFormat, jid, "myJobName"); + + assertArrayEquals( + new String[] { "peter", "test-tm-id", "some-constant", jid.toString() }, + jmGroup.getScopeComponents()); + + assertEquals( + "peter.test-tm-id.some-constant." + jid, + jmGroup.getScopeString()); + registry.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 60f0a97..be1caa5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -33,6 +33,8 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner +import org.apache.flink.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer @@ -124,7 +126,8 @@ class JobManager( protected val submittedJobGraphs : SubmittedJobGraphStore, protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, protected val savepointStore: SavepointStore, - protected val jobRecoveryTimeout: FiniteDuration) + protected val jobRecoveryTimeout: FiniteDuration, + protected val metricsRegistry: Option[FlinkMetricRegistry]) extends FlinkActor with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging with LogMessages // mixin order is important, we want first logging @@ -149,6 +152,16 @@ class JobManager( var leaderSessionID: Option[UUID] = None + protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match { + case Some(registry) => + val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + Option(new JobManagerMetricGroup( + registry, NetUtils.ipAddressToUrlString(InetAddress.getByName(host)))) + case None => + log.warn("Could not instantiate JobManager metrics.") + None + } + /** Futures which have to be completed before terminating the job manager */ var futuresToComplete: Option[Seq[Future[Unit]]] = None @@ -269,6 +282,13 @@ class JobManager( // shut down the extra thread pool for futures executorService.shutdown() + // failsafe shutdown of the metrics registry + try { + metricsRegistry.map(_.shutdown()) + } catch { + case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) + } + log.debug(s"Job manager ${self.path} is completely stopped.") } @@ -2266,7 +2286,8 @@ object JobManager { SubmittedJobGraphStore, CheckpointRecoveryFactory, SavepointStore, - FiniteDuration // timeout for job recovery + FiniteDuration, // timeout for job recovery + Option[FlinkMetricRegistry] ) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -2358,6 +2379,13 @@ object JobManager { } } + val metricRegistry = try { + Option(new FlinkMetricRegistry(configuration)) + } catch { + case _: Exception => + None + } + (executorService, instanceManager, scheduler, @@ -2369,7 +2397,8 @@ object JobManager { submittedJobGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout) + jobRecoveryTimeout, + metricRegistry) } /** @@ -2432,7 +2461,8 @@ object JobManager { submittedJobGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout) = createJobManagerComponents( + jobRecoveryTimeout, + metricsRegistry) = createJobManagerComponents( configuration, None) @@ -2458,7 +2488,8 @@ object JobManager { submittedJobGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout) + jobRecoveryTimeout, + metricsRegistry) val jobManager: ActorRef = jobManagerActorName match { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java index 6d3f768..3d8f298 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java @@ -24,7 +24,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.MetricRegistry; import org.apache.flink.metrics.groups.IOMetricGroup; -import org.apache.flink.metrics.groups.JobMetricGroup; +import org.apache.flink.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -60,7 +60,7 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup { } } - private static class DummyJobMetricGroup extends JobMetricGroup { + private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup { public DummyJobMetricGroup() { super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob"); http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 763bd36..b4ba40b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -107,7 +107,8 @@ class TestingCluster( submittedJobsGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout) = JobManager.createJobManagerComponents( + jobRecoveryTimeout, + metricRegistry) = JobManager.createJobManagerComponents( config, createLeaderElectionService()) @@ -128,7 +129,8 @@ class TestingCluster( submittedJobsGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout)) + jobRecoveryTimeout, + metricRegistry)) val dispatcherJobManagerProps = if (synchronousDispatcher) { // disable asynchronous futures (e.g. accumulator update in Heartbeat) http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index e854b13..04689c6 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef import org.apache.flink.configuration.Configuration +import org.apache.flink.metrics.MetricRegistry import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -50,7 +51,8 @@ class TestingJobManager( submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, savepointStore : SavepointStore, - jobRecoveryTimeout: FiniteDuration) + jobRecoveryTimeout : FiniteDuration, + metricRegistry : Option[MetricRegistry]) extends JobManager( flinkConfiguration, executorService, @@ -64,5 +66,6 @@ class TestingJobManager( submittedJobGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout) + jobRecoveryTimeout, + metricRegistry) with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index b67e319..2f43d38 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -358,7 +358,8 @@ object TestingUtils { submittedJobGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout) = JobManager.createJobManagerComponents( + jobRecoveryTimeout, + metricsRegistry) = JobManager.createJobManagerComponents( configuration, None ) @@ -380,7 +381,8 @@ object TestingUtils { leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - jobRecoveryTimeout) + jobRecoveryTimeout, + metricsRegistry) val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) http://git-wip-us.apache.org/repos/asf/flink/blob/a3a9fd11/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 23b3adc..3df1adc 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -24,6 +24,7 @@ import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.metrics.MetricRegistry import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.clusterframework.ApplicationStatus import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -70,7 +71,8 @@ class YarnJobManager( submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, savepointStore: SavepointStore, - jobRecoveryTimeout: FiniteDuration) + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[MetricRegistry]) extends JobManager( flinkConfiguration, executorService, @@ -84,7 +86,8 @@ class YarnJobManager( submittedJobGraphs, checkpointRecoveryFactory, savepointStore, - jobRecoveryTimeout) { + jobRecoveryTimeout, + metricsRegistry) { val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds val YARN_HEARTBEAT_DELAY: FiniteDuration =
