This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4573d48008d3b19fa7e90ada7d2783ed02ba71d0
Author: zentol <[email protected]>
AuthorDate: Mon Sep 3 10:44:07 2018 +0200

    [hotfix][metrics] Rename TaskMetricGroup#addOperator
---
 .../org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java    | 6 +++---
 .../flink/runtime/metrics/groups/UnregisteredMetricGroups.java      | 2 +-
 .../src/main/java/org/apache/flink/runtime/operators/BatchTask.java | 2 +-
 .../main/java/org/apache/flink/runtime/operators/DataSinkTask.java  | 2 +-
 .../java/org/apache/flink/runtime/operators/DataSourceTask.java     | 2 +-
 .../org/apache/flink/runtime/operators/chaining/ChainedDriver.java  | 2 +-
 .../org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java  | 2 +-
 .../apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java    | 2 +-
 .../runtime/operators/chaining/ChainedOperatorsMetricTest.java      | 4 ++--
 .../flink/streaming/api/operators/AbstractStreamOperator.java       | 2 +-
 .../flink/streaming/runtime/tasks/OneInputStreamTaskTest.java       | 6 +++---
 .../flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java       | 6 +++---
 12 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 124fbf2..39d98d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -134,11 +134,11 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
        //  operators and cleanup
        // 
------------------------------------------------------------------------
 
-       public OperatorMetricGroup addOperator(String name) {
-               return addOperator(OperatorID.fromJobVertexID(vertexId), name);
+       public OperatorMetricGroup getOrAddOperator(String name) {
+               return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), 
name);
        }
 
-       public OperatorMetricGroup addOperator(OperatorID operatorID, String 
name) {
+       public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, 
String name) {
                if (name != null && name.length() > 
METRICS_OPERATOR_NAME_MAX_LENGTH) {
                        LOG.warn("The operator name {} exceeded the {} 
characters length limit and was truncated.", name, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
                        name = name.substring(0, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 3869aa6..8c635eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -145,7 +145,7 @@ public class UnregisteredMetricGroups {
                }
 
                @Override
-               public OperatorMetricGroup addOperator(OperatorID operatorID, 
String name) {
+               public OperatorMetricGroup getOrAddOperator(OperatorID 
operatorID, String name) {
                        return createUnregisteredOperatorMetricGroup();
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index e697869..d5f2fd0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -254,7 +254,7 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
 
                String headName =  
getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
                this.metrics = getEnvironment().getMetricGroup()
-                       .addOperator(headName.startsWith("CHAIN") ? 
headName.substring(6) : headName);
+                       .getOrAddOperator(headName.startsWith("CHAIN") ? 
headName.substring(6) : headName);
                this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
                if (config.getNumberOfChainedStubs() == 0) {
                        
this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 0ea376e..2c26301 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -409,6 +409,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
                return new DistributedRuntimeUDFContext(env.getTaskInfo(), 
getUserCodeClassLoader(),
                                getExecutionConfig(), 
env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-                               
getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
+                               
getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index cdfe1fa..a5ccfab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -402,6 +402,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                sourceName = sourceName.startsWith("CHAIN") ? 
sourceName.substring(6) : sourceName;
                return new DistributedRuntimeUDFContext(env.getTaskInfo(), 
getUserCodeClassLoader(),
                                getExecutionConfig(), 
env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-                               
getEnvironment().getMetricGroup().addOperator(sourceName));
+                               
getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 442a53c..7dd3a14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -69,7 +69,7 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
                this.config = config;
                this.taskName = taskName;
                this.userCodeClassLoader = userCodeClassLoader;
-               this.metrics = 
parent.getEnvironment().getMetricGroup().addOperator(taskName);
+               this.metrics = 
parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
                this.numRecordsIn = 
this.metrics.getIOMetricGroup().getNumRecordsInCounter();
                this.numRecordsOut = 
this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
                this.outputCollector = new CountingCollector<>(outputCollector, 
numRecordsOut);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 58198e4..2f62f55 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -93,7 +93,7 @@ public class OperatorGroupTest extends TestLogger {
                        OperatorMetricGroup operatorGroup =
                                new TaskManagerMetricGroup(registry, 
"theHostName", tmID)
                                        .addTaskForJob(jid, "myJobName", 
vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
-                                       .addOperator(operatorID, operatorName);
+                                       .getOrAddOperator(operatorID, 
operatorName);
 
                        assertArrayEquals(
                                new String[]{tmID, jid.toString(), 
vertexId.toString(), operatorName, operatorID.toString()},
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index d9e6158..079c7c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -176,7 +176,7 @@ public class TaskMetricGroupTest extends TestLogger {
                TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, 
job, new JobVertexID(), new AbstractID(), "task", 0, 0);
 
                String originalName = new String(new char[100]).replace("\0", 
"-");
-               OperatorMetricGroup operatorMetricGroup = 
taskMetricGroup.addOperator(originalName);
+               OperatorMetricGroup operatorMetricGroup = 
taskMetricGroup.getOrAddOperator(originalName);
 
                String storedName = operatorMetricGroup.getScopeComponents()[0];
                
Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, 
storedName.length());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
index 29ff6e8..c393af5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -117,7 +117,7 @@ public class ChainedOperatorsMetricTest extends 
TaskTestBase {
                // verify head operator metrics
                {
                        // this only returns the existing group and doesn't 
create a new one
-                       final OperatorMetricGroup operatorMetricGroup1 = 
taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+                       final OperatorMetricGroup operatorMetricGroup1 = 
taskMetricGroup.getOrAddOperator(HEAD_OPERATOR_NAME);
                        final OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup1.getIOMetricGroup();
                        final Counter numRecordsInCounter = 
ioMetricGroup.getNumRecordsInCounter();
                        final Counter numRecordsOutCounter = 
ioMetricGroup.getNumRecordsOutCounter();
@@ -129,7 +129,7 @@ public class ChainedOperatorsMetricTest extends 
TaskTestBase {
                // verify chained operator metrics
                {
                        // this only returns the existing group and doesn't 
create a new one
-                       final OperatorMetricGroup operatorMetricGroup1 = 
taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+                       final OperatorMetricGroup operatorMetricGroup1 = 
taskMetricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME);
                        final OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup1.getIOMetricGroup();
                        final Counter numRecordsInCounter = 
ioMetricGroup.getNumRecordsInCounter();
                        final Counter numRecordsOutCounter = 
ioMetricGroup.getNumRecordsOutCounter();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 52ba3e4..f52168b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -171,7 +171,7 @@ public abstract class AbstractStreamOperator<OUT>
                this.container = containingTask;
                this.config = config;
                try {
-                       OperatorMetricGroup operatorMetricGroup = 
environment.getMetricGroup().addOperator(config.getOperatorID(), 
config.getOperatorName());
+                       OperatorMetricGroup operatorMetricGroup = 
environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
                        this.output = new CountingOutput(output, 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
                        if (config.isChainStart()) {
                                
operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 96eaa78..f6d5021 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -626,7 +626,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                final TaskMetricGroup taskMetricGroup = new 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
                        @Override
-                       public OperatorMetricGroup addOperator(OperatorID 
operatorID, String name) {
+                       public OperatorMetricGroup getOrAddOperator(OperatorID 
operatorID, String name) {
                                return new 
OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
                        }
                };
@@ -682,13 +682,13 @@ public class OneInputStreamTaskTest extends TestLogger {
                InterceptingOperatorMetricGroup chainedOperatorMetricGroup = 
new InterceptingOperatorMetricGroup();
                InterceptingTaskMetricGroup taskMetricGroup = new 
InterceptingTaskMetricGroup() {
                        @Override
-                       public OperatorMetricGroup addOperator(OperatorID id, 
String name) {
+                       public OperatorMetricGroup getOrAddOperator(OperatorID 
id, String name) {
                                if (id.equals(headOperatorId)) {
                                        return headOperatorMetricGroup;
                                } else if (id.equals(chainedOperatorId)) {
                                        return chainedOperatorMetricGroup;
                                } else {
-                                       return super.addOperator(id, name);
+                                       return super.getOrAddOperator(id, name);
                                }
                        }
                };
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 5d15157..c48b647 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -402,7 +402,7 @@ public class TwoInputStreamTaskTest {
 
                final TaskMetricGroup taskMetricGroup = new 
UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
                        @Override
-                       public OperatorMetricGroup addOperator(OperatorID 
operatorID, String name) {
+                       public OperatorMetricGroup getOrAddOperator(OperatorID 
operatorID, String name) {
                                return new 
OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
                        }
                };
@@ -470,13 +470,13 @@ public class TwoInputStreamTaskTest {
                InterceptingOperatorMetricGroup chainedOperatorMetricGroup = 
new InterceptingOperatorMetricGroup();
                InterceptingTaskMetricGroup taskMetricGroup = new 
InterceptingTaskMetricGroup() {
                        @Override
-                       public OperatorMetricGroup addOperator(OperatorID id, 
String name) {
+                       public OperatorMetricGroup getOrAddOperator(OperatorID 
id, String name) {
                                if (id.equals(headOperatorId)) {
                                        return headOperatorMetricGroup;
                                } else if (id.equals(chainedOperatorId)) {
                                        return chainedOperatorMetricGroup;
                                } else {
-                                       return super.addOperator(id, name);
+                                       return super.getOrAddOperator(id, name);
                                }
                        }
                };

Reply via email to