Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2363#discussion_r77527637
--- Diff:
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
---
@@ -35,109 +46,111 @@
final Map<String, TaskManagerMetricStore> taskManagers = new
HashMap<>();
final Map<String, JobMetricStore> jobs = new HashMap<>();
- /**
- * Adds a metric to this MetricStore.
- *
- * @param name the metric identifier
- * @param value the metric value
- */
- public void add(String name, Object value) {
- TaskManagerMetricStore tm;
- JobMetricStore job;
- TaskMetricStore task;
-
+ public void add(MetricDump metric) {
try {
- String[] components = name.split(":");
- switch (components[0]) {
- /**
- * JobManagerMetricStore metric
- * format: 0:<user_scope>.<name>
- */
- case "0":
- jobManager.metrics.put(components[1],
value);
- break;
- /**
- * TaskManager metric
- * format: 1:<tm_id>:<user_scope>.<name>
- */
- case "1":
- if (components.length != 3) {
- break;
- }
- tm = taskManagers.get(components[1]);
+ QueryScopeInfo info = metric.scopeInfo;
+ TaskManagerMetricStore tm;
+ JobMetricStore job;
+ TaskMetricStore task;
+
+ String name = info.scope.isEmpty()
+ ? metric.name
+ : info.scope + "." + metric.name;
+
+ if (name.isEmpty()) { // malformed transmission
+ return;
+ }
+
+ switch (info.getCategory()) {
+ case INFO_CATEGORY_JM:
+ addMetric(jobManager.metrics, name,
metric);
+ case INFO_CATEGORY_TM:
+ String tmID =
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+ tm = taskManagers.get(tmID);
if (tm == null) {
tm = new
TaskManagerMetricStore();
- taskManagers.put(components[1],
tm);
+ taskManagers.put(tmID, tm);
}
- tm.metrics.put(components[2], value);
+ addMetric(tm.metrics, name, metric);
break;
- /**
- * Job metric
- * format: 2:<job_id>:<user_scope>.<name>
- */
- case "2":
- if (components.length != 3) {
- break;
- }
- job = jobs.get(components[1]);
+ case INFO_CATEGORY_JOB:
+ QueryScopeInfo.JobQueryScopeInfo
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+ job = jobs.get(jobInfo.jobID);
if (job == null) {
job = new JobMetricStore();
- jobs.put(components[1], job);
+ jobs.put(jobInfo.jobID, job);
}
- job.metrics.put(components[2], value);
+ addMetric(job.metrics, name, metric);
break;
- /**
- * Task metric
- * format:
3:<job_id>:<task_id>:<subtask_index>:<user_scope>.<name>
- *
- * As the WebInterface task metric queries
currently do not account for subtasks we don't
- * divide by subtask and instead use the
concatenation of subtask index and metric name as the name.
- */
- case "3":
- if (components.length != 5) {
- break;
- }
- job = jobs.get(components[1]);
+ case INFO_CATEGORY_TASK:
+ QueryScopeInfo.TaskQueryScopeInfo
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+ job = jobs.get(taskInfo.jobID);
if (job == null) {
job = new JobMetricStore();
- jobs.put(components[1], job);
+ jobs.put(taskInfo.jobID, job);
}
- task = job.tasks.get(components[2]);
+ task = job.tasks.get(taskInfo.vertexID);
if (task == null) {
task = new TaskMetricStore();
- job.tasks.put(components[2],
task);
+
job.tasks.put(taskInfo.vertexID, task);
}
- task.metrics.put(components[3] + "." +
components[4], value);
+ /**
+ * As the WebInterface task metric
queries currently do not account for subtasks we don't
+ * divide by subtask and instead use
the concatenation of subtask index and metric name as the name.
+ */
+ addMetric(task.metrics,
taskInfo.subtaskIndex + "." + name, metric);
break;
- /**
- * Operator metric
- * format:
4:<job_id>:<task_id>:<subtask_index>:<operator_name>:<user_scope>.<name>
- *
- * As the WebInterface does not account for
operators (because it can't) we don't
- * divide by operator and instead use the
concatenation of subtask index, operator name and metric name
- * as the name.
- */
- case "4":
- if (components.length != 6) {
- break;
- }
- job = jobs.get(components[1]);
+ case INFO_CATEGORY_OPERATOR:
+ QueryScopeInfo.OperatorQueryScopeInfo
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+ job = jobs.get(operatorInfo.jobID);
if (job == null) {
job = new JobMetricStore();
- jobs.put(components[1], job);
+ jobs.put(operatorInfo.jobID,
job);
}
- task = job.tasks.get(components[2]);
+ task =
job.tasks.get(operatorInfo.vertexID);
if (task == null) {
task = new TaskMetricStore();
- job.tasks.put(components[2],
task);
+
job.tasks.put(operatorInfo.vertexID, task);
}
- task.metrics.put(components[3] + "." +
components[4] + "." + components[5], value);
+ /**
+ * As the WebInterface does not account
for operators (because it can't) we don't
+ * divide by operator and instead use
the concatenation of subtask index, operator name and metric name
+ * as the name.
+ */
+ addMetric(task.metrics,
operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name,
metric);
break;
default:
- LOG.debug("Invalid metric name format:
" + name);
+ LOG.debug("Invalid metric dump
category: " + info.getCategory());
}
} catch (Exception e) {
- LOG.debug("Malformed metric name format: " + name);
+ LOG.debug("Malformed metric dump.", e);
+ }
+ }
+
+ private void addMetric(Map<String, Object> target, String name,
MetricDump metric) {
+ switch (metric.getCategory()) {
+ case METRIC_CATEGORY_COUNTER:
--- End diff --
The same question applies here as well. Why don't we simply check what type
the metric instance is?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---