This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b6eecb6965054e8c1a3600117af0d0976d8f842
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Fri May 24 15:14:27 2024 +0200

    [FLINK-38158][metrics] Add custom scope variables to operators
    
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-484%3A+Add+custom+metric+variables+to+operators
    
    When running a Flink job that reads data from multiple sources
    and writes to multiple sinks, it would be helpful to group
    metrics from each source/sink instance together, for example
    based on the underlying name of the source/sink table/topic.
---
 .../base/source/reader/SourceMetricsITCase.java    |  4 ++
 .../org/apache/flink/api/dag/Transformation.java   | 30 ++++++++++++++
 .../groups/InternalOperatorMetricGroup.java        |  7 +++-
 .../runtime/metrics/groups/TaskMetricGroup.java    | 12 ++++--
 .../metrics/groups/UnregisteredMetricGroups.java   | 13 +++++-
 .../streaming/api/datastream/DataStreamSink.java   | 16 ++++++++
 .../api/datastream/SingleOutputStreamOperator.java | 13 ++++++
 .../api/graph/SimpleTransformationTranslator.java  |  4 ++
 .../flink/streaming/api/graph/StreamConfig.java    | 33 +++++++++++++--
 .../flink/streaming/api/graph/StreamGraph.java     |  8 ++++
 .../streaming/api/graph/StreamGraphGenerator.java  |  4 ++
 .../flink/streaming/api/graph/StreamNode.java      | 10 +++++
 .../api/graph/StreamingJobGraphGenerator.java      |  1 +
 .../api/operators/AbstractStreamOperator.java      |  5 ++-
 .../api/operators/AbstractStreamOperatorV2.java    |  5 ++-
 .../streaming/runtime/tasks/OperatorChain.java     |  4 +-
 .../translators/SinkTransformationTranslator.java  | 15 +++++++
 .../metrics/groups/InternalOperatorGroupTest.java  | 48 ++++++++++++++--------
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 10 +++--
 .../runtime/tasks/OneInputStreamTaskTest.java      |  4 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java      |  4 +-
 .../test/streaming/runtime/SinkMetricsITCase.java  |  5 ++-
 22 files changed, 214 insertions(+), 41 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
index d12df8e7b22..b27d528fa30 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceMetricsITCase.java
@@ -55,6 +55,7 @@ import java.util.concurrent.CyclicBarrier;
 import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatCounter;
 import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
 
 /** Tests whether all provided metrics of a {@link Source} are of the expected 
values (FLIP-33). */
 class SourceMetricsITCase {
@@ -113,6 +114,7 @@ class SourceMetricsITCase {
         int stopAtRecord2 = numRecordsPerSplit - 1;
         DataStream<Integer> stream =
                 env.fromSource(source, strategy, "MetricTestingSource")
+                        .addMetricVariable("foo", "42")
                         .map(
                                 i -> {
                                     if (i % numRecordsPerSplit == stopAtRecord1
@@ -165,6 +167,8 @@ class SourceMetricsITCase {
 
         int subtaskWithMetrics = 0;
         for (OperatorMetricGroup group : groups) {
+            assertThat(group.getAllVariables()).contains(entry("foo", "42"));
+
             Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
             // there are only 2 splits assigned; so two groups will not update 
metrics
             if (group.getIOMetricGroup().getNumRecordsInCounter().getCount() 
== 0) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java 
b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
index bce2b749e7a..858fe3c9212 100644
--- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
@@ -118,6 +118,7 @@ public abstract class Transformation<T> {
     // If true, the parallelism of the transformation is explicitly set and 
should be respected.
     // Otherwise the parallelism can be changed at runtime.
     private boolean parallelismConfigured;
+    private @Nullable Map<String, String> additionalMetricVariables;
 
     public static int getNewNodeId() {
         return ID_COUNTER.incrementAndGet();
@@ -456,6 +457,34 @@ public abstract class Transformation<T> {
         return uid;
     }
 
+    /**
+     * @return additional variables that will be added to scope of the metrics 
reported from this
+     *     {@link Transformation}.
+     */
+    public @Nullable Map<String, String> getAdditionalMetricVariables() {
+        return additionalMetricVariables;
+    }
+
+    /**
+     * Adds additional variables that will be added to scope of the metrics 
reported from this
+     * operator.
+     *
+     * <p>Some transformations might be translated into multiple operators and 
in such cases, metric
+     * variables might be assigned to just one specific operator. For example 
{@code
+     * SinkTransformation}'s additional variables are only inherited by the 
writer operator. They
+     * are not used for committer or global committer.
+     *
+     * @param key
+     * @param value
+     */
+    public Transformation<T> addMetricVariable(String key, String value) {
+        if (additionalMetricVariables == null) {
+            additionalMetricVariables = new HashMap<>();
+        }
+        additionalMetricVariables.put(key, value);
+        return this;
+    }
+
     /**
      * Returns the slot sharing group of this transformation if present.
      *
@@ -652,6 +681,7 @@ public abstract class Transformation<T> {
         Transformation<?> that = (Transformation<?>) o;
         return Objects.equals(bufferTimeout, that.bufferTimeout)
                 && Objects.equals(id, that.id)
+                && Objects.equals(additionalMetricVariables, 
that.additionalMetricVariables)
                 && Objects.equals(parallelism, that.parallelism)
                 && Objects.equals(name, that.name)
                 && Objects.equals(outputType, that.outputType);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
index 87918f1f564..840626134a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java
@@ -38,14 +38,15 @@ public class InternalOperatorMetricGroup extends 
ComponentMetricGroup<TaskMetric
         implements OperatorMetricGroup {
     private final String operatorName;
     private final OperatorID operatorID;
-
     private final InternalOperatorIOMetricGroup ioMetrics;
+    private final Map<String, String> additionalVariables;
 
     InternalOperatorMetricGroup(
             MetricRegistry registry,
             TaskMetricGroup parent,
             OperatorID operatorID,
-            String operatorName) {
+            String operatorName,
+            Map<String, String> additionalVariables) {
         super(
                 registry,
                 registry.getScopeFormats()
@@ -54,6 +55,7 @@ public class InternalOperatorMetricGroup extends 
ComponentMetricGroup<TaskMetric
                 parent);
         this.operatorID = operatorID;
         this.operatorName = operatorName;
+        this.additionalVariables = additionalVariables;
 
         ioMetrics = new InternalOperatorIOMetricGroup(this);
     }
@@ -97,6 +99,7 @@ public class InternalOperatorMetricGroup extends 
ComponentMetricGroup<TaskMetric
     protected void putVariables(Map<String, String> variables) {
         variables.put(ScopeFormat.SCOPE_OPERATOR_ID, 
String.valueOf(operatorID));
         variables.put(ScopeFormat.SCOPE_OPERATOR_NAME, operatorName);
+        variables.putAll(additionalVariables);
         // we don't enter the subtask_index as the task group does that already
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 4967aafeae8..202c24276cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -144,11 +145,12 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
     // ------------------------------------------------------------------------
 
     public InternalOperatorMetricGroup getOrAddOperator(String operatorName) {
-        return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), 
operatorName);
+        return getOrAddOperator(
+                OperatorID.fromJobVertexID(vertexId), operatorName, 
Collections.emptyMap());
     }
 
     public InternalOperatorMetricGroup getOrAddOperator(
-            OperatorID operatorID, String operatorName) {
+            OperatorID operatorID, String operatorName, Map<String, String> 
additionalVariables) {
         final String truncatedOperatorName = 
MetricUtils.truncateOperatorName(operatorName);
 
         // unique OperatorIDs only exist in streaming, so we have to rely on 
the name for batch
@@ -160,7 +162,11 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
                     key,
                     operator ->
                             new InternalOperatorMetricGroup(
-                                    this.registry, this, operatorID, 
truncatedOperatorName));
+                                    this.registry,
+                                    this,
+                                    operatorID,
+                                    truncatedOperatorName,
+                                    additionalVariables));
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 345c9aeea3c..6c9446d2256 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -25,6 +25,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.util.AbstractID;
 
+import java.util.Collections;
+import java.util.Map;
+
 import static 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID.randomId;
 
 /** A collection of safe drop-in replacements for existing {@link 
ComponentMetricGroup}s. */
@@ -190,7 +193,8 @@ public class UnregisteredMetricGroups {
         }
 
         @Override
-        public InternalOperatorMetricGroup getOrAddOperator(OperatorID 
operatorID, String name) {
+        public InternalOperatorMetricGroup getOrAddOperator(
+                OperatorID operatorID, String name, Map<String, String> 
additionalVariables) {
             return createUnregisteredOperatorMetricGroup(this);
         }
     }
@@ -205,7 +209,12 @@ public class UnregisteredMetricGroups {
         }
 
         UnregisteredOperatorMetricGroup(TaskMetricGroup parent) {
-            super(NoOpMetricRegistry.INSTANCE, parent, DEFAULT_OPERATOR_ID, 
DEFAULT_OPERATOR_NAME);
+            super(
+                    NoOpMetricRegistry.INSTANCE,
+                    parent,
+                    DEFAULT_OPERATOR_ID,
+                    DEFAULT_OPERATOR_NAME,
+                    Collections.emptyMap());
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 42309b23520..8968b9a533e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -139,6 +139,22 @@ public class DataStreamSink<T> {
         return this;
     }
 
+    /**
+     * Adds additional variables that will be added to scope of the metrics 
reported from this
+     * operator.
+     *
+     * <p>The specified additional variables are only used for the writer 
operator. They are not
+     * used for committer or global committer.
+     *
+     * @param key
+     * @param value
+     */
+    @PublicEvolving
+    public DataStreamSink<T> addMetricVariable(String key, String value) {
+        transformation.addMetricVariable(key, value);
+        return this;
+    }
+
     /**
      * Sets an user provided hash for this operator. This will be used AS IS 
the create the
      * JobVertexID.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 42bb8f30efe..e1da1ee8f02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -102,6 +102,19 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
         return this;
     }
 
+    /**
+     * Adds additional variables that will be added to scope of the metrics 
reported from this
+     * operator.
+     *
+     * @param key
+     * @param value
+     */
+    @PublicEvolving
+    public SingleOutputStreamOperator<T> addMetricVariable(String key, String 
value) {
+        transformation.addMetricVariable(key, value);
+        return this;
+    }
+
     /**
      * Sets an user provided hash for this operator. This will be used AS IS 
the create the
      * JobVertexID.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
index 90b154dbb46..098bf5a3343 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
@@ -101,6 +101,10 @@ public abstract class SimpleTransformationTranslator<OUT, 
T extends Transformati
         if (transformation.getUid() != null) {
             streamGraph.setTransformationUID(transformationId, 
transformation.getUid());
         }
+        if (transformation.getAdditionalMetricVariables() != null) {
+            streamGraph.setAdditionalMetricVariables(
+                    transformation.getId(), 
transformation.getAdditionalMetricVariables());
+        }
         if (transformation.getUserProvidedNodeHash() != null) {
             streamGraph.setTransformationUserHash(
                     transformationId, 
transformation.getUserProvidedNodeHash());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 72e8c4e38e5..96295c38486 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -43,6 +43,8 @@ import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -117,6 +119,7 @@ public class StreamConfig implements Serializable {
     private static final ConfigOption<Boolean> GRAPH_CONTAINING_LOOPS =
             
ConfigOptions.key("graphContainingLoops").booleanType().defaultValue(false);
 
+    private static final String ADDITIONAL_METRIC_VARIABLES = 
"additionalmetricvariables";
     private static final String CHECKPOINT_STORAGE = "checkpointstorage";
     private static final String STATE_BACKEND = "statebackend";
     private static final String TIMER_SERVICE_PROVIDER = "timerservice";
@@ -635,6 +638,28 @@ public class StreamConfig implements Serializable {
         }
     }
 
+    public void setAdditionalMetricVariables(
+            @Nullable Map<String, String> additionalMetricVariables) {
+        if (additionalMetricVariables != null) {
+            toBeSerializedConfigObjects.put(ADDITIONAL_METRIC_VARIABLES, 
additionalMetricVariables);
+        }
+    }
+
+    public Map<String, String> getAdditionalMetricVariables() {
+        try {
+            Map<String, String> additionalMetricVariables =
+                    InstantiationUtil.readObjectFromConfig(
+                            this.config,
+                            ADDITIONAL_METRIC_VARIABLES,
+                            this.getClass().getClassLoader());
+            return additionalMetricVariables == null
+                    ? Collections.emptyMap()
+                    : additionalMetricVariables;
+        } catch (Exception e) {
+            throw new StreamTaskException("Could not instantiate additional 
metric variables.", e);
+        }
+    }
+
     @VisibleForTesting
     public void setCheckpointStorage(CheckpointStorage storage) {
         if (storage != null) {
@@ -879,9 +904,9 @@ public class StreamConfig implements Serializable {
         }
     }
 
-    public static boolean requiresSorting(StreamConfig.InputConfig 
inputConfig) {
-        return inputConfig instanceof StreamConfig.NetworkInputConfig
-                && ((StreamConfig.NetworkInputConfig) 
inputConfig).getInputRequirement()
-                        == StreamConfig.InputRequirement.SORTED;
+    public static boolean requiresSorting(InputConfig inputConfig) {
+        return inputConfig instanceof NetworkInputConfig
+                && ((NetworkInputConfig) inputConfig).getInputRequirement()
+                        == InputRequirement.SORTED;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 2b12308ef4a..a64d534ba93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -1080,6 +1080,14 @@ public class StreamGraph implements Pipeline, 
ExecutionPlan {
         }
     }
 
+    public void setAdditionalMetricVariables(
+            Integer nodeId, Map<String, String> additionalMetricVariables) {
+        StreamNode node = streamNodes.get(nodeId);
+        if (node != null) {
+            node.setAdditionalMetricVariables(additionalMetricVariables);
+        }
+    }
+
     void setTransformationUserHash(Integer nodeId, String nodeHash) {
         StreamNode node = streamNodes.get(nodeId);
         if (node != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index b0003d32483..f35da5d5c02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -546,6 +546,10 @@ public class StreamGraphGenerator {
         if (transform.getUid() != null) {
             streamGraph.setTransformationUID(transform.getId(), 
transform.getUid());
         }
+        if (transform.getAdditionalMetricVariables() != null) {
+            streamGraph.setAdditionalMetricVariables(
+                    transform.getId(), 
transform.getAdditionalMetricVariables());
+        }
         if (transform.getUserProvidedNodeHash() != null) {
             streamGraph.setTransformationUserHash(
                     transform.getId(), transform.getUserProvidedNodeHash());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 34fb5a57f9c..bf18fe082e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -99,6 +99,8 @@ public class StreamNode implements Serializable {
 
     private final Map<Integer, StreamConfig.InputRequirement> 
inputRequirements = new HashMap<>();
 
+    private @Nullable Map<String, String> additionalMetricVariables;
+
     private @Nullable IntermediateDataSetID consumeClusterDatasetId;
 
     private boolean supportsConcurrentExecutionAttempts = true;
@@ -408,6 +410,14 @@ public class StreamNode implements Serializable {
         return inputRequirements;
     }
 
+    public @Nullable Map<String, String> getAdditionalMetricVariables() {
+        return additionalMetricVariables;
+    }
+
+    public void setAdditionalMetricVariables(Map<String, String> 
additionalMetricVariables) {
+        this.additionalMetricVariables = additionalMetricVariables;
+    }
+
     public Optional<OperatorCoordinator.Provider> getCoordinatorProvider(
             String operatorName, OperatorID operatorID) {
         if (operatorFactory != null && operatorFactory instanceof 
CoordinatedOperatorFactory) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 199046274bc..1aa62761d43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1253,6 +1253,7 @@ public class StreamingJobGraphGenerator {
             config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
         }
         config.setStateKeySerializer(vertex.getStateKeySerializer());
+        
config.setAdditionalMetricVariables(vertex.getAdditionalMetricVariables());
 
         Class<? extends TaskInvokable> vertexClass = 
vertex.getJobVertexClass();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 484b4512573..a4f2a756384 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -188,7 +188,10 @@ public abstract class AbstractStreamOperator<OUT>
         this.metrics =
                 environment
                         .getMetricGroup()
-                        .getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
+                        .getOrAddOperator(
+                                config.getOperatorID(),
+                                config.getOperatorName(),
+                                config.getAdditionalMetricVariables());
         this.combinedWatermark = 
IndexedCombinedWatermarkStatus.forInputsCount(2);
 
         try {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index dfe4cf73ad8..d89eaf46544 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -119,7 +119,10 @@ public abstract class AbstractStreamOperatorV2<OUT>
         metrics =
                 environment
                         .getMetricGroup()
-                        .getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
+                        .getOrAddOperator(
+                                config.getOperatorID(),
+                                config.getOperatorName(),
+                                config.getAdditionalMetricVariables());
         latencyStats =
                 createLatencyStats(
                         environment.getTaskManagerInfo().getConfiguration(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 71b2b53586e..78d202c62c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -668,7 +668,9 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                         .getEnvironment()
                         .getMetricGroup()
                         .getOrAddOperator(
-                                operatorConfig.getOperatorID(), 
operatorConfig.getOperatorName());
+                                operatorConfig.getOperatorID(),
+                                operatorConfig.getOperatorName(),
+                                operatorConfig.getAdditionalMetricVariables());
 
         return 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 26510f5fff3..97c2e408469 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -55,6 +55,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
@@ -394,6 +395,7 @@ public class SinkTransformationTranslator<Input, Output>
                         subTransformation,
                         
StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME,
                         operatorsUidHashes.getGlobalCommitterUidHash());
+                setAdditionalMetricVariablesForWriter(subTransformation);
 
                 concatUid(subTransformation, subTransformation.getName());
 
@@ -466,6 +468,19 @@ public class SinkTransformationTranslator<Input, Output>
             return result;
         }
 
+        /**
+         * Only writer inherits additional metric variables, as writer's 
metrics like number of
+         * records in, busyness, etc. are the most representative of "sink's" 
metrics.
+         */
+        private void setAdditionalMetricVariablesForWriter(Transformation<?> 
subTransformation) {
+            Map<String, String> additionalMetricVariables =
+                    transformation.getAdditionalMetricVariables();
+            if (additionalMetricVariables != null
+                    && 
subTransformation.getName().equals(ConfigConstants.WRITER_NAME)) {
+                
additionalMetricVariables.forEach(subTransformation::addMetricVariable);
+            }
+        }
+
         private void setOperatorUidHashIfPossible(
                 Transformation<?> transformation,
                 String writerName,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java
index 521fc3efc17..e694ae64fda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java
@@ -30,33 +30,30 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+
+import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableSet;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
 
 /** Tests for the {@link InternalOperatorMetricGroup}. */
 class InternalOperatorGroupTest {
 
-    private MetricRegistryImpl registry;
+    private static final int NUMBER_OF_REPORTERS = 1;
+    private TestingMetricRegistry registry;
 
     @BeforeEach
-    void setup() {
-        registry =
-                new MetricRegistryImpl(
-                        
MetricRegistryTestUtils.defaultMetricRegistryConfiguration());
-    }
-
-    @AfterEach
-    void teardown() throws Exception {
-        if (registry != null) {
-            registry.closeAsync().get();
-        }
+    public void setup() {
+        registry = 
TestingMetricRegistry.builder().setNumberReporters(NUMBER_OF_REPORTERS).build();
     }
 
     @Test
@@ -68,8 +65,11 @@ class InternalOperatorGroupTest {
         TaskMetricGroup taskGroup =
                 tmGroup.addJob(new JobID(), "myJobName")
                         .addTask(createExecutionAttemptId(new JobVertexID(), 
11, 0), "aTaskName");
+        Map<String, String> additionalVariables = new HashMap<>();
+        additionalVariables.put("foo", "42");
+        additionalVariables.put("bar", "44");
         InternalOperatorMetricGroup opGroup =
-                taskGroup.getOrAddOperator(new OperatorID(), "myOpName");
+                taskGroup.getOrAddOperator(new OperatorID(), "myOpName", 
additionalVariables);
 
         assertThat(opGroup.getScopeComponents())
                 .containsExactly(
@@ -77,6 +77,16 @@ class InternalOperatorGroupTest {
 
         assertThat(opGroup.getMetricIdentifier("name"))
                 
.isEqualTo("theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name");
+
+        /**
+         * {@link InternalOperatorMetricGroup#getAllVariables()} and {@link
+         * InternalOperatorMetricGroup#getAllVariables(int, Set)} work quite 
differently, so we have
+         * to test them separately.
+         */
+        assertThat(opGroup.getAllVariables(NUMBER_OF_REPORTERS - 1, 
ImmutableSet.of("foo")))
+                .contains(entry("bar", "44"))
+                .doesNotContain(entry("foo", "42"));
+        assertThat(opGroup.getAllVariables()).contains(entry("foo", "42"), 
entry("bar", "44"));
     }
 
     @Test
@@ -99,7 +109,7 @@ class InternalOperatorGroupTest {
                                     registry, "theHostName", new 
ResourceID(tmID))
                             .addJob(jid, "myJobName")
                             .addTask(createExecutionAttemptId(vertexId, 13, 
2), "aTaskname")
-                            .getOrAddOperator(operatorID, operatorName);
+                            .getOrAddOperator(operatorID, operatorName, 
Collections.emptyMap());
 
             assertThat(operatorGroup.getScopeComponents())
                     .containsExactly(
@@ -127,7 +137,7 @@ class InternalOperatorGroupTest {
                 tmGroup.addJob(new JobID(), "myJobName")
                         .addTask(createExecutionAttemptId(new JobVertexID(), 
11, 0), "aTaskName");
         InternalOperatorMetricGroup opGroup =
-                taskGroup.getOrAddOperator(new OperatorID(), "myOpName");
+                taskGroup.getOrAddOperator(new OperatorID(), "myOpName", 
Collections.emptyMap());
 
         assertThat(opGroup.getIOMetricGroup()).isNotNull();
         
assertThat(opGroup.getIOMetricGroup().getNumRecordsInCounter()).isNotNull();
@@ -146,7 +156,8 @@ class InternalOperatorGroupTest {
                         registry, "theHostName", new ResourceID("test-tm-id"));
 
         TaskMetricGroup taskGroup = tmGroup.addJob(jid, 
"myJobName").addTask(eid, "aTaskName");
-        InternalOperatorMetricGroup opGroup = taskGroup.getOrAddOperator(oid, 
"myOpName");
+        InternalOperatorMetricGroup opGroup =
+                taskGroup.getOrAddOperator(oid, "myOpName", 
Collections.emptyMap());
 
         Map<String, String> variables = opGroup.getAllVariables();
 
@@ -180,7 +191,8 @@ class InternalOperatorGroupTest {
                         registry, "host", new ResourceID("id"));
 
         TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(eid, 
"taskName");
-        InternalOperatorMetricGroup operator = task.getOrAddOperator(oid, 
"operator");
+        InternalOperatorMetricGroup operator =
+                task.getOrAddOperator(oid, "operator", Collections.emptyMap());
 
         QueryScopeInfo.OperatorQueryScopeInfo info =
                 operator.createQueryServiceMetricInfo(new 
DummyCharacterFilter());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 1460ecab315..80aae91973e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -412,9 +412,11 @@ class MultipleInputStreamTaskTest {
                 new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
                     @Override
                     public InternalOperatorMetricGroup getOrAddOperator(
-                            OperatorID operatorID, String name) {
+                            OperatorID operatorID,
+                            String name,
+                            Map<String, String> additionalVariables) {
                         InternalOperatorMetricGroup operatorMetricGroup =
-                                super.getOrAddOperator(operatorID, name);
+                                super.getOrAddOperator(operatorID, name, 
additionalVariables);
                         operatorMetrics.put(name, operatorMetricGroup);
                         return operatorMetricGroup;
                     }
@@ -747,13 +749,13 @@ class MultipleInputStreamTaskTest {
                 new InterceptingTaskMetricGroup() {
                     @Override
                     public InternalOperatorMetricGroup getOrAddOperator(
-                            OperatorID id, String name) {
+                            OperatorID id, String name, Map<String, String> 
additionalVariables) {
                         if (id.equals(mainOperatorId)) {
                             return mainOperatorMetricGroup;
                         } else if (id.equals(chainedOperatorId)) {
                             return chainedOperatorMetricGroup;
                         } else {
-                            return super.getOrAddOperator(id, name);
+                            return super.getOrAddOperator(id, name, 
additionalVariables);
                         }
                     }
                 };
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 68aa069408b..f7bbd997ba0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -849,13 +849,13 @@ class OneInputStreamTaskTest {
                 new InterceptingTaskMetricGroup() {
                     @Override
                     public InternalOperatorMetricGroup getOrAddOperator(
-                            OperatorID id, String name) {
+                            OperatorID id, String name, Map<String, String> 
additionalVariables) {
                         if (id.equals(headOperatorId)) {
                             return headOperatorMetricGroup;
                         } else if (id.equals(chainedOperatorId)) {
                             return chainedOperatorMetricGroup;
                         } else {
-                            return super.getOrAddOperator(id, name);
+                            return super.getOrAddOperator(id, name, 
additionalVariables);
                         }
                     }
                 };
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 803941effb5..710f465a935 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -606,13 +606,13 @@ class TwoInputStreamTaskTest {
                 new InterceptingTaskMetricGroup() {
                     @Override
                     public InternalOperatorMetricGroup getOrAddOperator(
-                            OperatorID id, String name) {
+                            OperatorID id, String name, Map<String, String> 
additionalVariables) {
                         if (id.equals(headOperatorId)) {
                             return headOperatorMetricGroup;
                         } else if (id.equals(chainedOperatorId)) {
                             return chainedOperatorMetricGroup;
                         } else {
-                            return super.getOrAddOperator(id, name);
+                            return super.getOrAddOperator(id, name, 
additionalVariables);
                         }
                     }
                 };
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index c47de118bb7..c53ced64694 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -52,6 +52,7 @@ import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatCoun
 import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasSize;
 
 /** Tests whether all provided metrics of a {@link Sink} are of the expected 
values (FLIP-33). */
@@ -104,7 +105,8 @@ public class SinkMetricsITCase extends TestLogger {
                             return i;
                         })
                 .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new 
MetricWriter()).build())
-                .name(TEST_LONG_SINK_NAME);
+                .name(TEST_LONG_SINK_NAME)
+                .addMetricVariable("foo", "42");
         JobClient jobClient = env.executeAsync();
         final JobID jobId = jobClient.getJobID();
 
@@ -131,6 +133,7 @@ public class SinkMetricsITCase extends TestLogger {
 
         int subtaskWithMetrics = 0;
         for (OperatorMetricGroup group : groups) {
+            assertThat(group.getAllVariables(), hasEntry("foo", "42"));
             Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
             // There are only 2 splits assigned; so two groups will not update 
metrics.
             if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() 
== 0) {


Reply via email to