This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4573d48008d3b19fa7e90ada7d2783ed02ba71d0 Author: zentol <[email protected]> AuthorDate: Mon Sep 3 10:44:07 2018 +0200 [hotfix][metrics] Rename TaskMetricGroup#addOperator --- .../org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java | 6 +++--- .../flink/runtime/metrics/groups/UnregisteredMetricGroups.java | 2 +- .../src/main/java/org/apache/flink/runtime/operators/BatchTask.java | 2 +- .../main/java/org/apache/flink/runtime/operators/DataSinkTask.java | 2 +- .../java/org/apache/flink/runtime/operators/DataSourceTask.java | 2 +- .../org/apache/flink/runtime/operators/chaining/ChainedDriver.java | 2 +- .../org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java | 2 +- .../apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java | 2 +- .../runtime/operators/chaining/ChainedOperatorsMetricTest.java | 4 ++-- .../flink/streaming/api/operators/AbstractStreamOperator.java | 2 +- .../flink/streaming/runtime/tasks/OneInputStreamTaskTest.java | 6 +++--- .../flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java | 6 +++--- 12 files changed, 19 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 124fbf2..39d98d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -134,11 +134,11 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr // operators and cleanup // ------------------------------------------------------------------------ - public OperatorMetricGroup addOperator(String name) { - return addOperator(OperatorID.fromJobVertexID(vertexId), name); + public OperatorMetricGroup getOrAddOperator(String name) { + return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), name); } - public OperatorMetricGroup addOperator(OperatorID operatorID, String name) { + public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) { LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH); name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java index 3869aa6..8c635eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java @@ -145,7 +145,7 @@ public class UnregisteredMetricGroups { } @Override - public OperatorMetricGroup addOperator(OperatorID operatorID, String name) { + public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { return createUnregisteredOperatorMetricGroup(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index e697869..d5f2fd0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -254,7 +254,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme String headName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim(); this.metrics = getEnvironment().getMetricGroup() - .addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName); + .getOrAddOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName); this.metrics.getIOMetricGroup().reuseInputMetricsForTask(); if (config.getNumberOfChainedStubs() == 0) { this.metrics.getIOMetricGroup().reuseOutputMetricsForTask(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 0ea376e..2c26301 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -409,6 +409,6 @@ public class DataSinkTask<IT> extends AbstractInvokable { return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(), getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), - getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName())); + getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName())); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index cdfe1fa..a5ccfab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -402,6 +402,6 @@ public class DataSourceTask<OT> extends AbstractInvokable { sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName; return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(), getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), - getEnvironment().getMetricGroup().addOperator(sourceName)); + getEnvironment().getMetricGroup().getOrAddOperator(sourceName)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 442a53c..7dd3a14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -69,7 +69,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { this.config = config; this.taskName = taskName; this.userCodeClassLoader = userCodeClassLoader; - this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName); + this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName); this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter(); this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter(); this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java index 58198e4..2f62f55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -93,7 +93,7 @@ public class OperatorGroupTest extends TestLogger { OperatorMetricGroup operatorGroup = new TaskManagerMetricGroup(registry, "theHostName", tmID) .addTaskForJob(jid, "myJobName", vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2) - .addOperator(operatorID, operatorName); + .getOrAddOperator(operatorID, operatorName); assertArrayEquals( new String[]{tmID, jid.toString(), vertexId.toString(), operatorName, operatorID.toString()}, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index d9e6158..079c7c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -176,7 +176,7 @@ public class TaskMetricGroupTest extends TestLogger { TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new JobVertexID(), new AbstractID(), "task", 0, 0); String originalName = new String(new char[100]).replace("\0", "-"); - OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName); + OperatorMetricGroup operatorMetricGroup = taskMetricGroup.getOrAddOperator(originalName); String storedName = operatorMetricGroup.getScopeComponents()[0]; Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java index 29ff6e8..c393af5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java @@ -117,7 +117,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase { // verify head operator metrics { // this only returns the existing group and doesn't create a new one - final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME); + final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(HEAD_OPERATOR_NAME); final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup(); final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter(); final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter(); @@ -129,7 +129,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase { // verify chained operator metrics { // this only returns the existing group and doesn't create a new one - final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME); + final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME); final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup(); final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter(); final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 52ba3e4..f52168b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -171,7 +171,7 @@ public abstract class AbstractStreamOperator<OUT> this.container = containingTask; this.config = config; try { - OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().addOperator(config.getOperatorID(), config.getOperatorName()); + OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName()); this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()); if (config.isChainStart()) { operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 96eaa78..f6d5021 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -626,7 +626,7 @@ public class OneInputStreamTaskTest extends TestLogger { final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { @Override - public OperatorMetricGroup addOperator(OperatorID operatorID, String name) { + public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name); } }; @@ -682,13 +682,13 @@ public class OneInputStreamTaskTest extends TestLogger { InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { @Override - public OperatorMetricGroup addOperator(OperatorID id, String name) { + public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) { if (id.equals(headOperatorId)) { return headOperatorMetricGroup; } else if (id.equals(chainedOperatorId)) { return chainedOperatorMetricGroup; } else { - return super.addOperator(id, name); + return super.getOrAddOperator(id, name); } } }; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 5d15157..c48b647 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -402,7 +402,7 @@ public class TwoInputStreamTaskTest { final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { @Override - public OperatorMetricGroup addOperator(OperatorID operatorID, String name) { + public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name); } }; @@ -470,13 +470,13 @@ public class TwoInputStreamTaskTest { InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { @Override - public OperatorMetricGroup addOperator(OperatorID id, String name) { + public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) { if (id.equals(headOperatorId)) { return headOperatorMetricGroup; } else if (id.equals(chainedOperatorId)) { return chainedOperatorMetricGroup; } else { - return super.addOperator(id, name); + return super.getOrAddOperator(id, name); } } };
