This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b6eecb6965054e8c1a3600117af0d0976d8f842 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Fri May 24 15:14:27 2024 +0200 [FLINK-38158][metrics] Add custom scope variables to operators https://cwiki.apache.org/confluence/display/FLINK/FLIP-484%3A+Add+custom+metric+variables+to+operators When running a Flink job that reads data from multiple sources and writes to multiple sinks, it would be helpful to group metrics from each source/sink instance together, for example based on the underlying name of the source/sink table/topic. --- .../base/source/reader/SourceMetricsITCase.java | 4 ++ .../org/apache/flink/api/dag/Transformation.java | 30 ++++++++++++++ .../groups/InternalOperatorMetricGroup.java | 7 +++- .../runtime/metrics/groups/TaskMetricGroup.java | 12 ++++-- .../metrics/groups/UnregisteredMetricGroups.java | 13 +++++- .../streaming/api/datastream/DataStreamSink.java | 16 ++++++++ .../api/datastream/SingleOutputStreamOperator.java | 13 ++++++ .../api/graph/SimpleTransformationTranslator.java | 4 ++ .../flink/streaming/api/graph/StreamConfig.java | 33 +++++++++++++-- .../flink/streaming/api/graph/StreamGraph.java | 8 ++++ .../streaming/api/graph/StreamGraphGenerator.java | 4 ++ .../flink/streaming/api/graph/StreamNode.java | 10 +++++ .../api/graph/StreamingJobGraphGenerator.java | 1 + .../api/operators/AbstractStreamOperator.java | 5 ++- .../api/operators/AbstractStreamOperatorV2.java | 5 ++- .../streaming/runtime/tasks/OperatorChain.java | 4 +- .../translators/SinkTransformationTranslator.java | 15 +++++++ .../metrics/groups/InternalOperatorGroupTest.java | 48 ++++++++++++++-------- .../runtime/tasks/MultipleInputStreamTaskTest.java | 10 +++-- .../runtime/tasks/OneInputStreamTaskTest.java | 4 +- .../runtime/tasks/TwoInputStreamTaskTest.java | 4 +- .../test/streaming/runtime/SinkMetricsITCase.java | 5 ++- 22 files changed, 214 insertions(+), 41 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java index d12df8e7b22..b27d528fa30 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java @@ -55,6 +55,7 @@ import java.util.concurrent.CyclicBarrier; import static org.apache.flink.metrics.testutils.MetricAssertions.assertThatCounter; import static org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; /** Tests whether all provided metrics of a {@link Source} are of the expected values (FLIP-33). */ class SourceMetricsITCase { @@ -113,6 +114,7 @@ class SourceMetricsITCase { int stopAtRecord2 = numRecordsPerSplit - 1; DataStream<Integer> stream = env.fromSource(source, strategy, "MetricTestingSource") + .addMetricVariable("foo", "42") .map( i -> { if (i % numRecordsPerSplit == stopAtRecord1 @@ -165,6 +167,8 @@ class SourceMetricsITCase { int subtaskWithMetrics = 0; for (OperatorMetricGroup group : groups) { + assertThat(group.getAllVariables()).contains(entry("foo", "42")); + Map<String, Metric> metrics = reporter.getMetricsByGroup(group); // there are only 2 splits assigned; so two groups will not update metrics if (group.getIOMetricGroup().getNumRecordsInCounter().getCount() == 0) { diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index bce2b749e7a..858fe3c9212 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -118,6 +118,7 @@ public abstract class Transformation<T> { // If true, the parallelism of the transformation is explicitly set and should be respected. // Otherwise the parallelism can be changed at runtime. private boolean parallelismConfigured; + private @Nullable Map<String, String> additionalMetricVariables; public static int getNewNodeId() { return ID_COUNTER.incrementAndGet(); @@ -456,6 +457,34 @@ public abstract class Transformation<T> { return uid; } + /** + * @return additional variables that will be added to scope of the metrics reported from this + * {@link Transformation}. + */ + public @Nullable Map<String, String> getAdditionalMetricVariables() { + return additionalMetricVariables; + } + + /** + * Adds additional variables that will be added to scope of the metrics reported from this + * operator. + * + * <p>Some transformations might be translated into multiple operators and in such cases, metric + * variables might be assigned to just one specific operator. For example {@code + * SinkTransformation}'s additional variables are only inherited by the writer operator. They + * are not used for committer or global committer. + * + * @param key + * @param value + */ + public Transformation<T> addMetricVariable(String key, String value) { + if (additionalMetricVariables == null) { + additionalMetricVariables = new HashMap<>(); + } + additionalMetricVariables.put(key, value); + return this; + } + /** * Returns the slot sharing group of this transformation if present. * @@ -652,6 +681,7 @@ public abstract class Transformation<T> { Transformation<?> that = (Transformation<?>) o; return Objects.equals(bufferTimeout, that.bufferTimeout) && Objects.equals(id, that.id) + && Objects.equals(additionalMetricVariables, that.additionalMetricVariables) && Objects.equals(parallelism, that.parallelism) && Objects.equals(name, that.name) && Objects.equals(outputType, that.outputType); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java index 87918f1f564..840626134a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java @@ -38,14 +38,15 @@ public class InternalOperatorMetricGroup extends ComponentMetricGroup<TaskMetric implements OperatorMetricGroup { private final String operatorName; private final OperatorID operatorID; - private final InternalOperatorIOMetricGroup ioMetrics; + private final Map<String, String> additionalVariables; InternalOperatorMetricGroup( MetricRegistry registry, TaskMetricGroup parent, OperatorID operatorID, - String operatorName) { + String operatorName, + Map<String, String> additionalVariables) { super( registry, registry.getScopeFormats() @@ -54,6 +55,7 @@ public class InternalOperatorMetricGroup extends ComponentMetricGroup<TaskMetric parent); this.operatorID = operatorID; this.operatorName = operatorName; + this.additionalVariables = additionalVariables; ioMetrics = new InternalOperatorIOMetricGroup(this); } @@ -97,6 +99,7 @@ public class InternalOperatorMetricGroup extends ComponentMetricGroup<TaskMetric protected void putVariables(Map<String, String> variables) { variables.put(ScopeFormat.SCOPE_OPERATOR_ID, String.valueOf(operatorID)); variables.put(ScopeFormat.SCOPE_OPERATOR_NAME, operatorName); + variables.putAll(additionalVariables); // we don't enter the subtask_index as the task group does that already } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 4967aafeae8..202c24276cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -31,6 +31,7 @@ import org.apache.flink.util.AbstractID; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -144,11 +145,12 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr // ------------------------------------------------------------------------ public InternalOperatorMetricGroup getOrAddOperator(String operatorName) { - return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), operatorName); + return getOrAddOperator( + OperatorID.fromJobVertexID(vertexId), operatorName, Collections.emptyMap()); } public InternalOperatorMetricGroup getOrAddOperator( - OperatorID operatorID, String operatorName) { + OperatorID operatorID, String operatorName, Map<String, String> additionalVariables) { final String truncatedOperatorName = MetricUtils.truncateOperatorName(operatorName); // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch @@ -160,7 +162,11 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr key, operator -> new InternalOperatorMetricGroup( - this.registry, this, operatorID, truncatedOperatorName)); + this.registry, + this, + operatorID, + truncatedOperatorName, + additionalVariables)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java index 345c9aeea3c..6c9446d2256 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java @@ -25,6 +25,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.util.AbstractID; +import java.util.Collections; +import java.util.Map; + import static org.apache.flink.runtime.executiongraph.ExecutionAttemptID.randomId; /** A collection of safe drop-in replacements for existing {@link ComponentMetricGroup}s. */ @@ -190,7 +193,8 @@ public class UnregisteredMetricGroups { } @Override - public InternalOperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { + public InternalOperatorMetricGroup getOrAddOperator( + OperatorID operatorID, String name, Map<String, String> additionalVariables) { return createUnregisteredOperatorMetricGroup(this); } } @@ -205,7 +209,12 @@ public class UnregisteredMetricGroups { } UnregisteredOperatorMetricGroup(TaskMetricGroup parent) { - super(NoOpMetricRegistry.INSTANCE, parent, DEFAULT_OPERATOR_ID, DEFAULT_OPERATOR_NAME); + super( + NoOpMetricRegistry.INSTANCE, + parent, + DEFAULT_OPERATOR_ID, + DEFAULT_OPERATOR_NAME, + Collections.emptyMap()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 42309b23520..8968b9a533e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -139,6 +139,22 @@ public class DataStreamSink<T> { return this; } + /** + * Adds additional variables that will be added to scope of the metrics reported from this + * operator. + * + * <p>The specified additional variables are only used for the writer operator. They are not + * used for committer or global committer. + * + * @param key + * @param value + */ + @PublicEvolving + public DataStreamSink<T> addMetricVariable(String key, String value) { + transformation.addMetricVariable(key, value); + return this; + } + /** * Sets an user provided hash for this operator. This will be used AS IS the create the * JobVertexID. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 42bb8f30efe..e1da1ee8f02 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -102,6 +102,19 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { return this; } + /** + * Adds additional variables that will be added to scope of the metrics reported from this + * operator. + * + * @param key + * @param value + */ + @PublicEvolving + public SingleOutputStreamOperator<T> addMetricVariable(String key, String value) { + transformation.addMetricVariable(key, value); + return this; + } + /** * Sets an user provided hash for this operator. This will be used AS IS the create the * JobVertexID. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java index 90b154dbb46..098bf5a3343 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java @@ -101,6 +101,10 @@ public abstract class SimpleTransformationTranslator<OUT, T extends Transformati if (transformation.getUid() != null) { streamGraph.setTransformationUID(transformationId, transformation.getUid()); } + if (transformation.getAdditionalMetricVariables() != null) { + streamGraph.setAdditionalMetricVariables( + transformation.getId(), transformation.getAdditionalMetricVariables()); + } if (transformation.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash( transformationId, transformation.getUserProvidedNodeHash()); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 72e8c4e38e5..96295c38486 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -43,6 +43,8 @@ import org.apache.flink.util.OutputTag; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -117,6 +119,7 @@ public class StreamConfig implements Serializable { private static final ConfigOption<Boolean> GRAPH_CONTAINING_LOOPS = ConfigOptions.key("graphContainingLoops").booleanType().defaultValue(false); + private static final String ADDITIONAL_METRIC_VARIABLES = "additionalmetricvariables"; private static final String CHECKPOINT_STORAGE = "checkpointstorage"; private static final String STATE_BACKEND = "statebackend"; private static final String TIMER_SERVICE_PROVIDER = "timerservice"; @@ -635,6 +638,28 @@ public class StreamConfig implements Serializable { } } + public void setAdditionalMetricVariables( + @Nullable Map<String, String> additionalMetricVariables) { + if (additionalMetricVariables != null) { + toBeSerializedConfigObjects.put(ADDITIONAL_METRIC_VARIABLES, additionalMetricVariables); + } + } + + public Map<String, String> getAdditionalMetricVariables() { + try { + Map<String, String> additionalMetricVariables = + InstantiationUtil.readObjectFromConfig( + this.config, + ADDITIONAL_METRIC_VARIABLES, + this.getClass().getClassLoader()); + return additionalMetricVariables == null + ? Collections.emptyMap() + : additionalMetricVariables; + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate additional metric variables.", e); + } + } + @VisibleForTesting public void setCheckpointStorage(CheckpointStorage storage) { if (storage != null) { @@ -879,9 +904,9 @@ public class StreamConfig implements Serializable { } } - public static boolean requiresSorting(StreamConfig.InputConfig inputConfig) { - return inputConfig instanceof StreamConfig.NetworkInputConfig - && ((StreamConfig.NetworkInputConfig) inputConfig).getInputRequirement() - == StreamConfig.InputRequirement.SORTED; + public static boolean requiresSorting(InputConfig inputConfig) { + return inputConfig instanceof NetworkInputConfig + && ((NetworkInputConfig) inputConfig).getInputRequirement() + == InputRequirement.SORTED; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 2b12308ef4a..a64d534ba93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -1080,6 +1080,14 @@ public class StreamGraph implements Pipeline, ExecutionPlan { } } + public void setAdditionalMetricVariables( + Integer nodeId, Map<String, String> additionalMetricVariables) { + StreamNode node = streamNodes.get(nodeId); + if (node != null) { + node.setAdditionalMetricVariables(additionalMetricVariables); + } + } + void setTransformationUserHash(Integer nodeId, String nodeHash) { StreamNode node = streamNodes.get(nodeId); if (node != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index b0003d32483..f35da5d5c02 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -546,6 +546,10 @@ public class StreamGraphGenerator { if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } + if (transform.getAdditionalMetricVariables() != null) { + streamGraph.setAdditionalMetricVariables( + transform.getId(), transform.getAdditionalMetricVariables()); + } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash( transform.getId(), transform.getUserProvidedNodeHash()); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 34fb5a57f9c..bf18fe082e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -99,6 +99,8 @@ public class StreamNode implements Serializable { private final Map<Integer, StreamConfig.InputRequirement> inputRequirements = new HashMap<>(); + private @Nullable Map<String, String> additionalMetricVariables; + private @Nullable IntermediateDataSetID consumeClusterDatasetId; private boolean supportsConcurrentExecutionAttempts = true; @@ -408,6 +410,14 @@ public class StreamNode implements Serializable { return inputRequirements; } + public @Nullable Map<String, String> getAdditionalMetricVariables() { + return additionalMetricVariables; + } + + public void setAdditionalMetricVariables(Map<String, String> additionalMetricVariables) { + this.additionalMetricVariables = additionalMetricVariables; + } + public Optional<OperatorCoordinator.Provider> getCoordinatorProvider( String operatorName, OperatorID operatorID) { if (operatorFactory != null && operatorFactory instanceof CoordinatedOperatorFactory) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 199046274bc..1aa62761d43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1253,6 +1253,7 @@ public class StreamingJobGraphGenerator { config.setStatePartitioner(i, vertex.getStatePartitioners()[i]); } config.setStateKeySerializer(vertex.getStateKeySerializer()); + config.setAdditionalMetricVariables(vertex.getAdditionalMetricVariables()); Class<? extends TaskInvokable> vertexClass = vertex.getJobVertexClass(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 484b4512573..a4f2a756384 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -188,7 +188,10 @@ public abstract class AbstractStreamOperator<OUT> this.metrics = environment .getMetricGroup() - .getOrAddOperator(config.getOperatorID(), config.getOperatorName()); + .getOrAddOperator( + config.getOperatorID(), + config.getOperatorName(), + config.getAdditionalMetricVariables()); this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(2); try { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index dfe4cf73ad8..d89eaf46544 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -119,7 +119,10 @@ public abstract class AbstractStreamOperatorV2<OUT> metrics = environment .getMetricGroup() - .getOrAddOperator(config.getOperatorID(), config.getOperatorName()); + .getOrAddOperator( + config.getOperatorID(), + config.getOperatorName(), + config.getAdditionalMetricVariables()); latencyStats = createLatencyStats( environment.getTaskManagerInfo().getConfiguration(), diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 71b2b53586e..78d202c62c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -668,7 +668,9 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> .getEnvironment() .getMetricGroup() .getOrAddOperator( - operatorConfig.getOperatorID(), operatorConfig.getOperatorName()); + operatorConfig.getOperatorID(), + operatorConfig.getOperatorName(), + operatorConfig.getAdditionalMetricVariables()); return operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index 26510f5fff3..97c2e408469 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -55,6 +55,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; @@ -394,6 +395,7 @@ public class SinkTransformationTranslator<Input, Output> subTransformation, StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, operatorsUidHashes.getGlobalCommitterUidHash()); + setAdditionalMetricVariablesForWriter(subTransformation); concatUid(subTransformation, subTransformation.getName()); @@ -466,6 +468,19 @@ public class SinkTransformationTranslator<Input, Output> return result; } + /** + * Only writer inherits additional metric variables, as writer's metrics like number of + * records in, busyness, etc. are the most representative of "sink's" metrics. + */ + private void setAdditionalMetricVariablesForWriter(Transformation<?> subTransformation) { + Map<String, String> additionalMetricVariables = + transformation.getAdditionalMetricVariables(); + if (additionalMetricVariables != null + && subTransformation.getName().equals(ConfigConstants.WRITER_NAME)) { + additionalMetricVariables.forEach(subTransformation::addMetricVariable); + } + } + private void setOperatorUidHashIfPossible( Transformation<?> transformation, String writerName, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java index 521fc3efc17..e694ae64fda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java @@ -30,33 +30,30 @@ import org.apache.flink.runtime.metrics.MetricRegistryTestUtils; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; +import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; + +import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableSet; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; /** Tests for the {@link InternalOperatorMetricGroup}. */ class InternalOperatorGroupTest { - private MetricRegistryImpl registry; + private static final int NUMBER_OF_REPORTERS = 1; + private TestingMetricRegistry registry; @BeforeEach - void setup() { - registry = - new MetricRegistryImpl( - MetricRegistryTestUtils.defaultMetricRegistryConfiguration()); - } - - @AfterEach - void teardown() throws Exception { - if (registry != null) { - registry.closeAsync().get(); - } + public void setup() { + registry = TestingMetricRegistry.builder().setNumberReporters(NUMBER_OF_REPORTERS).build(); } @Test @@ -68,8 +65,11 @@ class InternalOperatorGroupTest { TaskMetricGroup taskGroup = tmGroup.addJob(new JobID(), "myJobName") .addTask(createExecutionAttemptId(new JobVertexID(), 11, 0), "aTaskName"); + Map<String, String> additionalVariables = new HashMap<>(); + additionalVariables.put("foo", "42"); + additionalVariables.put("bar", "44"); InternalOperatorMetricGroup opGroup = - taskGroup.getOrAddOperator(new OperatorID(), "myOpName"); + taskGroup.getOrAddOperator(new OperatorID(), "myOpName", additionalVariables); assertThat(opGroup.getScopeComponents()) .containsExactly( @@ -77,6 +77,16 @@ class InternalOperatorGroupTest { assertThat(opGroup.getMetricIdentifier("name")) .isEqualTo("theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name"); + + /** + * {@link InternalOperatorMetricGroup#getAllVariables()} and {@link + * InternalOperatorMetricGroup#getAllVariables(int, Set)} work quite differently, so we have + * to test them separately. + */ + assertThat(opGroup.getAllVariables(NUMBER_OF_REPORTERS - 1, ImmutableSet.of("foo"))) + .contains(entry("bar", "44")) + .doesNotContain(entry("foo", "42")); + assertThat(opGroup.getAllVariables()).contains(entry("foo", "42"), entry("bar", "44")); } @Test @@ -99,7 +109,7 @@ class InternalOperatorGroupTest { registry, "theHostName", new ResourceID(tmID)) .addJob(jid, "myJobName") .addTask(createExecutionAttemptId(vertexId, 13, 2), "aTaskname") - .getOrAddOperator(operatorID, operatorName); + .getOrAddOperator(operatorID, operatorName, Collections.emptyMap()); assertThat(operatorGroup.getScopeComponents()) .containsExactly( @@ -127,7 +137,7 @@ class InternalOperatorGroupTest { tmGroup.addJob(new JobID(), "myJobName") .addTask(createExecutionAttemptId(new JobVertexID(), 11, 0), "aTaskName"); InternalOperatorMetricGroup opGroup = - taskGroup.getOrAddOperator(new OperatorID(), "myOpName"); + taskGroup.getOrAddOperator(new OperatorID(), "myOpName", Collections.emptyMap()); assertThat(opGroup.getIOMetricGroup()).isNotNull(); assertThat(opGroup.getIOMetricGroup().getNumRecordsInCounter()).isNotNull(); @@ -146,7 +156,8 @@ class InternalOperatorGroupTest { registry, "theHostName", new ResourceID("test-tm-id")); TaskMetricGroup taskGroup = tmGroup.addJob(jid, "myJobName").addTask(eid, "aTaskName"); - InternalOperatorMetricGroup opGroup = taskGroup.getOrAddOperator(oid, "myOpName"); + InternalOperatorMetricGroup opGroup = + taskGroup.getOrAddOperator(oid, "myOpName", Collections.emptyMap()); Map<String, String> variables = opGroup.getAllVariables(); @@ -180,7 +191,8 @@ class InternalOperatorGroupTest { registry, "host", new ResourceID("id")); TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(eid, "taskName"); - InternalOperatorMetricGroup operator = task.getOrAddOperator(oid, "operator"); + InternalOperatorMetricGroup operator = + task.getOrAddOperator(oid, "operator", Collections.emptyMap()); QueryScopeInfo.OperatorQueryScopeInfo info = operator.createQueryServiceMetricInfo(new DummyCharacterFilter()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 1460ecab315..80aae91973e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -412,9 +412,11 @@ class MultipleInputStreamTaskTest { new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { @Override public InternalOperatorMetricGroup getOrAddOperator( - OperatorID operatorID, String name) { + OperatorID operatorID, + String name, + Map<String, String> additionalVariables) { InternalOperatorMetricGroup operatorMetricGroup = - super.getOrAddOperator(operatorID, name); + super.getOrAddOperator(operatorID, name, additionalVariables); operatorMetrics.put(name, operatorMetricGroup); return operatorMetricGroup; } @@ -747,13 +749,13 @@ class MultipleInputStreamTaskTest { new InterceptingTaskMetricGroup() { @Override public InternalOperatorMetricGroup getOrAddOperator( - OperatorID id, String name) { + OperatorID id, String name, Map<String, String> additionalVariables) { if (id.equals(mainOperatorId)) { return mainOperatorMetricGroup; } else if (id.equals(chainedOperatorId)) { return chainedOperatorMetricGroup; } else { - return super.getOrAddOperator(id, name); + return super.getOrAddOperator(id, name, additionalVariables); } } }; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 68aa069408b..f7bbd997ba0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -849,13 +849,13 @@ class OneInputStreamTaskTest { new InterceptingTaskMetricGroup() { @Override public InternalOperatorMetricGroup getOrAddOperator( - OperatorID id, String name) { + OperatorID id, String name, Map<String, String> additionalVariables) { if (id.equals(headOperatorId)) { return headOperatorMetricGroup; } else if (id.equals(chainedOperatorId)) { return chainedOperatorMetricGroup; } else { - return super.getOrAddOperator(id, name); + return super.getOrAddOperator(id, name, additionalVariables); } } }; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 803941effb5..710f465a935 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -606,13 +606,13 @@ class TwoInputStreamTaskTest { new InterceptingTaskMetricGroup() { @Override public InternalOperatorMetricGroup getOrAddOperator( - OperatorID id, String name) { + OperatorID id, String name, Map<String, String> additionalVariables) { if (id.equals(headOperatorId)) { return headOperatorMetricGroup; } else if (id.equals(chainedOperatorId)) { return chainedOperatorMetricGroup; } else { - return super.getOrAddOperator(id, name); + return super.getOrAddOperator(id, name, additionalVariables); } } }; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java index c47de118bb7..c53ced64694 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java @@ -52,6 +52,7 @@ import static org.apache.flink.metrics.testutils.MetricAssertions.assertThatCoun import static org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; /** Tests whether all provided metrics of a {@link Sink} are of the expected values (FLIP-33). */ @@ -104,7 +105,8 @@ public class SinkMetricsITCase extends TestLogger { return i; }) .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new MetricWriter()).build()) - .name(TEST_LONG_SINK_NAME); + .name(TEST_LONG_SINK_NAME) + .addMetricVariable("foo", "42"); JobClient jobClient = env.executeAsync(); final JobID jobId = jobClient.getJobID(); @@ -131,6 +133,7 @@ public class SinkMetricsITCase extends TestLogger { int subtaskWithMetrics = 0; for (OperatorMetricGroup group : groups) { + assertThat(group.getAllVariables(), hasEntry("foo", "42")); Map<String, Metric> metrics = reporter.getMetricsByGroup(group); // There are only 2 splits assigned; so two groups will not update metrics. if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {