This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93ac95866e4473982a89e563d58ef0e374b3c0ba Author: zentol <[email protected]> AuthorDate: Tue Aug 21 18:37:07 2018 +0200 [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch --- .../runtime/metrics/groups/TaskMetricGroup.java | 8 +- .../chaining/ChainedOperatorsMetricTest.java | 175 +++++++++++++++++++++ .../operators/testutils/MockEnvironment.java | 10 +- .../testutils/MockEnvironmentBuilder.java | 11 +- 4 files changed, 197 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 441dbf8..124fbf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGroup> { - private final Map<OperatorID, OperatorMetricGroup> operators = new HashMap<>(); + private final Map<String, OperatorMetricGroup> operators = new HashMap<>(); static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80; @@ -144,15 +144,17 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH); } OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name); + // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators + final String key = operatorID + name; synchronized (this) { - OperatorMetricGroup previous = operators.put(operatorID, operator); + OperatorMetricGroup previous = operators.put(key, operator); if (previous == null) { // no operator group so far return operator; } else { // already had an operator group. restore that one. - operators.put(operatorID, previous); + operators.put(key, previous); return previous; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java new file mode 100644 index 0000000..29ff6e8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.FlatMapDriver; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.TaskTestBase; +import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory; +import org.apache.flink.types.Record; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Metrics related tests for batch task chains. + */ +public class ChainedOperatorsMetricTest extends TaskTestBase { + + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3; + + private static final int NETWORK_BUFFER_SIZE = 1024; + + private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get(); + + private final List<Record> outList = new ArrayList<>(); + + private static final String HEAD_OPERATOR_NAME = "headoperator"; + private static final String CHAINED_OPERATOR_NAME = "chainedoperator"; + + @Test + public void testOperatorIOMetricReuse() throws Exception { + // environment + initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE); + this.mockEnv = new MockEnvironmentBuilder() + .setTaskName(HEAD_OPERATOR_NAME) + .setMemorySize(MEMORY_MANAGER_SIZE) + .setInputSplitProvider(this.inputSplitProvider) + .setBufferSize(NETWORK_BUFFER_SIZE) + .setMetricGroup(new TaskMetricGroup( + NoOpMetricRegistry.INSTANCE, + UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), + new JobVertexID(), + new AbstractID(), + "task", + 0, + 0)) + .build(); + + final int keyCnt = 100; + final int valCnt = 20; + final int numRecords = keyCnt * valCnt; + addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0); + addOutput(this.outList); + + // the chained operator + addChainedOperator(); + + // creates the head operator and assembles the chain + registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class); + final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv); + + testTask.invoke(); + + Assert.assertEquals(numRecords * 2 * 2, this.outList.size()); + + final TaskMetricGroup taskMetricGroup = mockEnv.getMetricGroup(); + + // verify task-level metrics + { + final TaskIOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup(); + final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter(); + final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter(); + + Assert.assertEquals(numRecords, numRecordsInCounter.getCount()); + Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount()); + } + + // verify head operator metrics + { + // this only returns the existing group and doesn't create a new one + final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME); + final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup(); + final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter(); + final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter(); + + Assert.assertEquals(numRecords, numRecordsInCounter.getCount()); + Assert.assertEquals(numRecords * 2, numRecordsOutCounter.getCount()); + } + + // verify chained operator metrics + { + // this only returns the existing group and doesn't create a new one + final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME); + final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup(); + final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter(); + final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter(); + + Assert.assertEquals(numRecords * 2, numRecordsInCounter.getCount()); + Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount()); + } + } + + private void addChainedOperator() { + final TaskConfig chainedConfig = new TaskConfig(new Configuration()); + + // input + chainedConfig.addInputToGroup(0); + chainedConfig.setInputSerializer(serFact, 0); + + // output + chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + chainedConfig.setOutputSerializer(serFact); + + // driver + chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP); + + // udf + chainedConfig.setStubWrapper(new UserCodeClassWrapper<>(DuplicatingFlatMapFunction.class)); + + getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, chainedConfig, CHAINED_OPERATOR_NAME); + } + + /** + * Simple {@link FlatMapFunction} that duplicates the input. + */ + public static class DuplicatingFlatMapFunction extends RichFlatMapFunction<Record, Record> { + + private static final long serialVersionUID = -1152068682935346164L; + + @Override + public void flatMap(final Record value, final Collector<Record> out) throws Exception { + out.collect(value); + out.collect(value); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 4bf94e9..68858bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateManager; @@ -108,6 +107,8 @@ public class MockEnvironment implements Environment, AutoCloseable { private Optional<Throwable> actualExternalFailureCause = Optional.empty(); + private final TaskMetricGroup taskMetricGroup; + public static MockEnvironmentBuilder builder() { return new MockEnvironmentBuilder(); } @@ -125,7 +126,8 @@ public class MockEnvironment implements Environment, AutoCloseable { int maxParallelism, int parallelism, int subtaskIndex, - ClassLoader userCodeClassLoader) { + ClassLoader userCodeClassLoader, + TaskMetricGroup taskMetricGroup) { this.jobID = jobID; this.jobVertexID = jobVertexID; @@ -150,6 +152,8 @@ public class MockEnvironment implements Environment, AutoCloseable { this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); this.taskStateManager = Preconditions.checkNotNull(taskStateManager); + + this.taskMetricGroup = taskMetricGroup; } @@ -213,7 +217,7 @@ public class MockEnvironment implements Environment, AutoCloseable { @Override public TaskMetricGroup getMetricGroup() { - return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + return taskMetricGroup; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java index dfb10d4..dfcc5f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TestTaskStateManager; @@ -40,6 +42,7 @@ public class MockEnvironmentBuilder { private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader(); private JobID jobID = new JobID(); private JobVertexID jobVertexID = new JobVertexID(); + private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); public MockEnvironmentBuilder setTaskName(String taskName) { this.taskName = taskName; @@ -106,6 +109,11 @@ public class MockEnvironmentBuilder { return this; } + public MockEnvironmentBuilder setMetricGroup(TaskMetricGroup taskMetricGroup) { + this.taskMetricGroup = taskMetricGroup; + return this; + } + public MockEnvironment build() { return new MockEnvironment( jobID, @@ -120,6 +128,7 @@ public class MockEnvironmentBuilder { maxParallelism, parallelism, subtaskIndex, - userCodeClassLoader); + userCodeClassLoader, + taskMetricGroup); } }
