Repository: flink
Updated Branches:
  refs/heads/master 21e8e2dcf -> 9948d4844


[FLINK-4774] [metrics] Fix scope concatenation in QueryScopeInfo

This closes #2613.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9948d484
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9948d484
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9948d484

Branch: refs/heads/master
Commit: 9948d48443460a881eb79983f586c7bd18201674
Parents: 21e8e2d
Author: zentol <ches...@apache.org>
Authored: Fri Oct 7 13:11:58 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Fri Oct 14 13:58:58 2016 +0200

----------------------------------------------------------------------
 .../runtime/metrics/dump/QueryScopeInfo.java    |  16 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  21 ++-
 .../metrics/groups/QueryScopeInfoTest.java      | 156 +++++++++++++++++++
 3 files changed, 181 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index df5c2bf..6572ca0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -50,6 +50,12 @@ public abstract class QueryScopeInfo {
      */
        public abstract byte getCategory();
 
+       protected String concatScopes(String additionalScope) {
+               return scope.isEmpty()
+                       ? additionalScope
+                       : scope + "." + additionalScope;
+       }
+
        /**
         * Container for the job manager scope. Stores no additional 
information.
      */
@@ -64,7 +70,7 @@ public abstract class QueryScopeInfo {
 
                @Override
                public JobManagerQueryScopeInfo copy(String additionalScope) {
-                       return new JobManagerQueryScopeInfo(this.scope + 
additionalScope);
+                       return new 
JobManagerQueryScopeInfo(concatScopes(additionalScope));
                }
 
                @Override
@@ -90,7 +96,7 @@ public abstract class QueryScopeInfo {
 
                @Override
                public TaskManagerQueryScopeInfo copy(String additionalScope) {
-                       return new 
TaskManagerQueryScopeInfo(this.taskManagerID, this.scope + additionalScope);
+                       return new 
TaskManagerQueryScopeInfo(this.taskManagerID, concatScopes(additionalScope));
                }
 
                @Override
@@ -116,7 +122,7 @@ public abstract class QueryScopeInfo {
 
                @Override
                public JobQueryScopeInfo copy(String additionalScope) {
-                       return new JobQueryScopeInfo(this.jobID, this.scope + 
additionalScope);
+                       return new JobQueryScopeInfo(this.jobID, 
concatScopes(additionalScope));
                }
 
                @Override
@@ -146,7 +152,7 @@ public abstract class QueryScopeInfo {
 
                @Override
                public TaskQueryScopeInfo copy(String additionalScope) {
-                       return new TaskQueryScopeInfo(this.jobID, 
this.vertexID, this.subtaskIndex, this.scope + additionalScope);
+                       return new TaskQueryScopeInfo(this.jobID, 
this.vertexID, this.subtaskIndex, concatScopes(additionalScope));
                }
 
                @Override
@@ -178,7 +184,7 @@ public abstract class QueryScopeInfo {
 
                @Override
                public OperatorQueryScopeInfo copy(String additionalScope) {
-                       return new OperatorQueryScopeInfo(this.jobID, 
this.vertexID, this.subtaskIndex, this.operatorName, this.scope + 
additionalScope);
+                       return new OperatorQueryScopeInfo(this.jobID, 
this.vertexID, this.subtaskIndex, this.operatorName, 
concatScopes(additionalScope));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 038fd1e..6a6e7aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -128,13 +128,20 @@ public class MetricGroupTest extends TestLogger {
                TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
                TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
                TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);
-               GenericMetricGroup userGroup = new GenericMetricGroup(registry, 
task, "hello");
-
-               QueryScopeInfo.TaskQueryScopeInfo info = 
(QueryScopeInfo.TaskQueryScopeInfo) userGroup.createQueryServiceMetricInfo(new 
DummyCharacterFilter());
-               assertEquals("hello", info.scope);
-               assertEquals(jid.toString(), info.jobID);
-               assertEquals(vid.toString(), info.vertexID);
-               assertEquals(4, info.subtaskIndex);
+               GenericMetricGroup userGroup1 = new 
GenericMetricGroup(registry, task, "hello");
+               GenericMetricGroup userGroup2 = new 
GenericMetricGroup(registry, userGroup1, "world");
+
+               QueryScopeInfo.TaskQueryScopeInfo info1 = 
(QueryScopeInfo.TaskQueryScopeInfo) userGroup1.createQueryServiceMetricInfo(new 
DummyCharacterFilter());
+               assertEquals("hello", info1.scope);
+               assertEquals(jid.toString(), info1.jobID);
+               assertEquals(vid.toString(), info1.vertexID);
+               assertEquals(4, info1.subtaskIndex);
+
+               QueryScopeInfo.TaskQueryScopeInfo info2 = 
(QueryScopeInfo.TaskQueryScopeInfo) userGroup2.createQueryServiceMetricInfo(new 
DummyCharacterFilter());
+               assertEquals("hello.world", info2.scope);
+               assertEquals(jid.toString(), info2.jobID);
+               assertEquals(vid.toString(), info2.vertexID);
+               assertEquals(4, info2.subtaskIndex);
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
new file mode 100644
index 0000000..1ff804a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class QueryScopeInfoTest {
+       @Test
+       public void testJobManagerQueryScopeInfo() {
+               QueryScopeInfo.JobManagerQueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo();
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, 
info.getCategory());
+               assertEquals("", info.scope);
+               
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, 
info.getCategory());
+               assertEquals("world", info.scope);
+
+               info = new QueryScopeInfo.JobManagerQueryScopeInfo("hello");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, 
info.getCategory());
+               assertEquals("hello", info.scope);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, 
info.getCategory());
+               assertEquals("hello.world", info.scope);
+       }
+
+       @Test
+       public void testTaskManagerQueryScopeInfo() {
+               QueryScopeInfo.TaskManagerQueryScopeInfo info = new 
QueryScopeInfo.TaskManagerQueryScopeInfo("tmid");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, 
info.getCategory());
+               assertEquals("", info.scope);
+               assertEquals("tmid", info.taskManagerID);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, 
info.getCategory());
+               assertEquals("world", info.scope);
+               assertEquals("tmid", info.taskManagerID);
+               
+               info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", 
"hello");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, 
info.getCategory());
+               assertEquals("hello", info.scope);
+               assertEquals("tmid", info.taskManagerID);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, 
info.getCategory());
+               assertEquals("hello.world", info.scope);
+               assertEquals("tmid", info.taskManagerID);
+       }
+
+       @Test
+       public void testJobQueryScopeInfo() {
+               QueryScopeInfo.JobQueryScopeInfo info = new 
QueryScopeInfo.JobQueryScopeInfo("jobid");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, 
info.getCategory());
+               assertEquals("", info.scope);
+               assertEquals("jobid", info.jobID);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, 
info.getCategory());
+               assertEquals("world", info.scope);
+               assertEquals("jobid", info.jobID);
+
+               info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "hello");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, 
info.getCategory());
+               assertEquals("hello", info.scope);
+               assertEquals("jobid", info.jobID);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, 
info.getCategory());
+               assertEquals("hello.world", info.scope);
+               assertEquals("jobid", info.jobID);
+       }
+
+       @Test
+       public void testTaskQueryScopeInfo() {
+               QueryScopeInfo.TaskQueryScopeInfo info = new 
QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2);
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, 
info.getCategory());
+               assertEquals("", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals(2, info.subtaskIndex);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, 
info.getCategory());
+               assertEquals("world", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals(2, info.subtaskIndex);
+
+               info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 
2, "hello");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, 
info.getCategory());
+               assertEquals("hello", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals(2, info.subtaskIndex);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, 
info.getCategory());
+               assertEquals("hello.world", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals(2, info.subtaskIndex);
+       }
+
+       @Test
+       public void testOperatorQueryScopeInfo() {
+               QueryScopeInfo.OperatorQueryScopeInfo info = new 
QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
+               assertEquals("", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals("opname", info.operatorName);
+               assertEquals(2, info.subtaskIndex);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
+               assertEquals("world", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals("opname", info.operatorName);
+               assertEquals(2, info.subtaskIndex);
+
+               info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", 
"taskid", 2, "opname", "hello");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
+               assertEquals("hello", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals("opname", info.operatorName);
+               assertEquals(2, info.subtaskIndex);
+
+               info = info.copy("world");
+               assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, 
info.getCategory());
+               assertEquals("hello.world", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals("taskid", info.vertexID);
+               assertEquals("opname", info.operatorName);
+               assertEquals(2, info.subtaskIndex);
+       }
+}

Reply via email to