This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b75d76e2f94 [FLINK-36946][Metrics] Optimize sink operator name 
truncate (#25832)
b75d76e2f94 is described below

commit b75d76e2f9484473285121d5d7af96794a2e4767
Author: zhangmang <zhangma...@163.com>
AuthorDate: Fri Jan 10 10:57:18 2025 +0800

    [FLINK-36946][Metrics] Optimize sink operator name truncate (#25832)
---
 .../flink/configuration/ConfigConstants.java       |  7 +++
 .../metrics/groups/JobManagerJobMetricGroup.java   | 18 ++-----
 .../runtime/metrics/groups/TaskMetricGroup.java    | 14 +----
 .../flink/runtime/metrics/util/MetricUtils.java    | 32 ++++++++++++
 .../translators/SinkTransformationTranslator.java  | 20 +++----
 .../metrics/groups/TaskMetricGroupTest.java        |  5 +-
 .../runtime/metrics/util/MetricUtilsTest.java      | 61 ++++++++++++++++++++++
 .../test/streaming/runtime/SinkMetricsITCase.java  | 15 +++---
 8 files changed, 127 insertions(+), 45 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 220a1ddcf73..64b445d2cce 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -111,6 +111,13 @@ public final class ConfigConstants {
 
     public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
 
+    /** The suffix of sink operator name. */
+    public static final String COMMITTER_NAME = "Committer";
+
+    public static final String WRITER_NAME = "Writer";
+
+    public static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
+
     /** Not instantiable. */
     private ConfigConstants() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
index 9b98f5ad149..bf9b65879b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerJobMetricGroup.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
@@ -30,7 +31,6 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -62,7 +62,7 @@ public class JobManagerJobMetricGroup extends 
JobMetricGroup<JobManagerMetricGro
 
     public JobManagerOperatorMetricGroup getOrAddOperator(
             AbstractID vertexId, String taskName, OperatorID operatorID, 
String operatorName) {
-        final String truncatedOperatorName = 
getTruncatedOperatorName(operatorName);
+        final String truncatedOperatorName = 
MetricUtils.truncateOperatorName(operatorName);
 
         // unique OperatorIDs only exist in streaming, so we have to rely on 
the name for batch
         // operators
@@ -82,25 +82,13 @@ public class JobManagerJobMetricGroup extends 
JobMetricGroup<JobManagerMetricGro
         }
     }
 
-    private String getTruncatedOperatorName(String operatorName) {
-        if (operatorName != null && operatorName.length() > 
METRICS_OPERATOR_NAME_MAX_LENGTH) {
-            LOG.warn(
-                    "The operator name {} exceeded the {} characters length 
limit and was truncated.",
-                    operatorName,
-                    METRICS_OPERATOR_NAME_MAX_LENGTH);
-            return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
-        } else {
-            return operatorName;
-        }
-    }
-
     @VisibleForTesting
     int numRegisteredOperatorMetricGroups() {
         return operators.size();
     }
 
     void removeOperatorMetricGroup(OperatorID operatorID, String operatorName) 
{
-        final String truncatedOperatorName = 
getTruncatedOperatorName(operatorName);
+        final String truncatedOperatorName = 
MetricUtils.truncateOperatorName(operatorName);
 
         // unique OperatorIDs only exist in streaming, so we have to rely on 
the name for batch
         // operators
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index afcbbaa44bf..34c2f7deaa9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.util.AbstractID;
 
 import javax.annotation.Nullable;
@@ -45,8 +46,6 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
 
     private final Map<String, InternalOperatorMetricGroup> operators = new 
HashMap<>();
 
-    static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
-
     private final TaskIOMetricGroup ioMetrics;
 
     /**
@@ -150,16 +149,7 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
 
     public InternalOperatorMetricGroup getOrAddOperator(
             OperatorID operatorID, String operatorName) {
-        final String truncatedOperatorName;
-        if (operatorName != null && operatorName.length() > 
METRICS_OPERATOR_NAME_MAX_LENGTH) {
-            LOG.warn(
-                    "The operator name {} exceeded the {} characters length 
limit and was truncated.",
-                    operatorName,
-                    METRICS_OPERATOR_NAME_MAX_LENGTH);
-            truncatedOperatorName = operatorName.substring(0, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
-        } else {
-            truncatedOperatorName = operatorName;
-        }
+        final String truncatedOperatorName = 
MetricUtils.truncateOperatorName(operatorName);
 
         // unique OperatorIDs only exist in streaming, so we have to rely on 
the name for batch
         // operators
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 356cee13e91..6b0c818fad6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
@@ -65,6 +66,7 @@ import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.configuration.ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH;
 import static 
org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -83,6 +85,8 @@ public class MetricUtils {
     @VisibleForTesting static final String METRIC_GROUP_MEMORY = "Memory";
 
     @VisibleForTesting static final String METRIC_GROUP_MANAGED_MEMORY = 
"Managed";
+    private static final String WRITER_SUFFIX = ": " + 
ConfigConstants.WRITER_NAME;
+    private static final String COMMITTER_SUFFIX = ": " + 
ConfigConstants.COMMITTER_NAME;
 
     private MetricUtils() {}
 
@@ -366,6 +370,34 @@ public class MetricUtils {
         }
     }
 
+    public static String truncateOperatorName(String operatorName) {
+        if (operatorName != null && operatorName.length() > 
METRICS_OPERATOR_NAME_MAX_LENGTH) {
+            LOG.warn(
+                    "The operator name {} exceeded the {} characters length 
limit and was truncated.",
+                    operatorName,
+                    METRICS_OPERATOR_NAME_MAX_LENGTH);
+            if (operatorName.endsWith(WRITER_SUFFIX)) {
+                return operatorName.substring(
+                                0,
+                                Math.max(
+                                        0,
+                                        METRICS_OPERATOR_NAME_MAX_LENGTH - 
WRITER_SUFFIX.length()))
+                        + WRITER_SUFFIX;
+            }
+            if (operatorName.endsWith(COMMITTER_SUFFIX)) {
+                return operatorName.substring(
+                                0,
+                                Math.max(
+                                        0,
+                                        METRICS_OPERATOR_NAME_MAX_LENGTH
+                                                - COMMITTER_SUFFIX.length()))
+                        + COMMITTER_SUFFIX;
+            }
+            return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
+        }
+        return operatorName;
+    }
+
     private static final class AttributeGauge<T> implements Gauge<T> {
         private final MBeanServer server;
         private final ObjectName objectName;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index dd41218a497..4dfdf2ca319 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -72,9 +73,6 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class SinkTransformationTranslator<Input, Output>
         implements TransformationTranslator<Output, SinkTransformation<Input, 
Output>> {
 
-    private static final String COMMITTER_NAME = "Committer";
-    private static final String WRITER_NAME = "Writer";
-
     @Override
     public Collection<Integer> translateForBatch(
             SinkTransformation<Input, Output> transformation, Context context) 
{
@@ -170,7 +168,7 @@ public class SinkTransformationTranslator<Input, Output>
                         prewritten,
                         input ->
                                 input.transform(
-                                        WRITER_NAME,
+                                        ConfigConstants.WRITER_NAME,
                                         CommittableMessageTypeInfo.noOutput(),
                                         new SinkWriterOperatorFactory<>(sink)),
                         false,
@@ -284,7 +282,7 @@ public class SinkTransformationTranslator<Input, Output>
                             precommitted,
                             pc ->
                                     pc.transform(
-                                            COMMITTER_NAME,
+                                            ConfigConstants.COMMITTER_NAME,
                                             committableTypeInformation,
                                             new CommitterOperatorFactory<>(
                                                     committingSink,
@@ -315,7 +313,7 @@ public class SinkTransformationTranslator<Input, Output>
                             inputStream,
                             input ->
                                     input.transform(
-                                            WRITER_NAME,
+                                            ConfigConstants.WRITER_NAME,
                                             typeInformation,
                                             new 
SinkWriterOperatorFactory<>(sink)),
                             false,
@@ -383,10 +381,12 @@ public class SinkTransformationTranslator<Input, Output>
 
                 // Set the operator uid hashes to support stateful upgrades 
without prior uids
                 setOperatorUidHashIfPossible(
-                        subTransformation, WRITER_NAME, 
operatorsUidHashes.getWriterUidHash());
+                        subTransformation,
+                        ConfigConstants.WRITER_NAME,
+                        operatorsUidHashes.getWriterUidHash());
                 setOperatorUidHashIfPossible(
                         subTransformation,
-                        COMMITTER_NAME,
+                        ConfigConstants.COMMITTER_NAME,
                         operatorsUidHashes.getCommitterUidHash());
                 setOperatorUidHashIfPossible(
                         subTransformation,
@@ -479,14 +479,14 @@ public class SinkTransformationTranslator<Input, Output>
             if (transformationName != null && transformation.getUid() != null) 
{
                 // Use the same uid pattern than for Sink V1. We deliberately 
decided to use the uid
                 // pattern of Flink 1.13 because 1.14 did not have a dedicated 
committer operator.
-                if (transformationName.equals(COMMITTER_NAME)) {
+                if (transformationName.equals(ConfigConstants.COMMITTER_NAME)) 
{
                     final String committerFormat = "Sink Committer: %s";
                     subTransformation.setUid(
                             String.format(committerFormat, 
transformation.getUid()));
                     return;
                 }
                 // Set the writer operator uid to the sinks uid to support 
state migrations
-                if (transformationName.equals(WRITER_NAME)) {
+                if (transformationName.equals(ConfigConstants.WRITER_NAME)) {
                     subTransformation.setUid(transformation.getUid());
                     return;
                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index e2df58b75a8..61c7015b47b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Metric;
@@ -200,8 +201,8 @@ class TaskMetricGroupTest {
                 taskMetricGroup.getOrAddOperator(originalName);
 
         String storedName = operatorMetricGroup.getScopeComponents()[0];
-        
assertThat(storedName.length()).isEqualTo(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH);
-        assertThat(originalName.substring(0, 
TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH))
+        
assertThat(storedName.length()).isEqualTo(ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH);
+        assertThat(originalName.substring(0, 
ConfigConstants.METRICS_OPERATOR_NAME_MAX_LENGTH))
                 .isEqualTo(storedName);
         registry.closeAsync().get();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
index e2182aa9daf..ae2f50cbbde 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
@@ -283,6 +283,67 @@ class MetricUtilsTest {
         }
     }
 
+    @Test
+    void testTruncateOperatorName() {
+        // test operator name is null
+        assertThat(MetricUtils.truncateOperatorName(null)).isNull();
+        // test operator name length less than 80
+        final String operatorNameLess = "testOperatorName";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameLess)).isEqualTo(operatorNameLess);
+        // test operator name length less than 80 and end with : Writer
+        final String operatorNameLessEndWithWriter = "testOperatorName: 
Writer";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithWriter))
+                .isEqualTo(operatorNameLessEndWithWriter);
+        // test operator name length less than 80 and end with : Committer
+        final String operatorNameLessEndWithCommitter = "testOperatorName: 
Committer";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameLessEndWithCommitter))
+                .isEqualTo(operatorNameLessEndWithCommitter);
+        // test operator name length less than 80 and contains with : Writer
+        final String operatorNameLessAndContainsWriter = "test: 
WriterOperatorName";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsWriter))
+                .isEqualTo(operatorNameLessAndContainsWriter);
+        // test operator name length less than 80 and contains with : Committer
+        final String operatorNameLessAndContainsCommitter = "test: 
CommitterOperatorName";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameLessAndContainsCommitter))
+                .isEqualTo(operatorNameLessAndContainsCommitter);
+
+        // test operator name length more than 80
+        final String operatorNameMore =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName";
+        final String expectedOperatorNameMore =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong";
+        assertThat(MetricUtils.truncateOperatorName(operatorNameMore))
+                .isEqualTo(expectedOperatorNameMore);
+
+        // test operator name length more than 80 and end with : Writer
+        final String operatorNameMoreEndWithWriter =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName:
 Writer";
+        final String expectedOperatorNameMoreEndWithWriter =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong: 
Writer";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithWriter))
+                .isEqualTo(expectedOperatorNameMoreEndWithWriter);
+
+        // test operator name length more than 80 and end with : Committer
+        final String operatorNameMoreEndWithCommitter =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongOperatorName:
 Committer";
+        final String expectedOperatorNameMoreEndWithCommitter =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongL: 
Committer";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreEndWithCommitter))
+                .isEqualTo(expectedOperatorNameMoreEndWithCommitter);
+
+        // test operator name length more than 80 and contains with : Writer
+        final String operatorNameMoreAndContainsWriter =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong:
 WriterOperatorName";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsWriter))
+                .isEqualTo(expectedOperatorNameMore);
+
+        // test operator name length more than 80 and contains with : Committer
+        final String operatorNameMoreAndContainsCommitter =
+                
"testLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLong:
 CommitterOperatorName";
+        
assertThat(MetricUtils.truncateOperatorName(operatorNameMoreAndContainsCommitter))
+                .isEqualTo(expectedOperatorNameMore);
+    }
+
     // --------------- utility methods and classes ---------------
 
     /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index 404f65f0ee6..c47de118bb7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.metrics.Metric;
@@ -28,6 +29,7 @@ import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -55,9 +57,8 @@ import static org.hamcrest.Matchers.hasSize;
 /** Tests whether all provided metrics of a {@link Sink} are of the expected 
values (FLIP-33). */
 public class SinkMetricsITCase extends TestLogger {
 
-    private static final String TEST_SINK_NAME = "MetricTestSink";
-    // please refer to SinkTransformationTranslator#WRITER_NAME
-    private static final String DEFAULT_WRITER_NAME = "Writer";
+    private static final String TEST_LONG_SINK_NAME =
+            
"LongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongLongMetricTestSink";
     private static final int DEFAULT_PARALLELISM = 4;
 
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
@@ -103,7 +104,7 @@ public class SinkMetricsITCase extends TestLogger {
                             return i;
                         })
                 .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new 
MetricWriter()).build())
-                .name(TEST_SINK_NAME);
+                .name(TEST_LONG_SINK_NAME);
         JobClient jobClient = env.executeAsync();
         final JobID jobId = jobClient.getJobID();
 
@@ -123,7 +124,9 @@ public class SinkMetricsITCase extends TestLogger {
             JobID jobId, long processedRecordsPerSubtask, int parallelism, int 
numSplits) {
         List<OperatorMetricGroup> groups =
                 reporter.findOperatorMetricGroups(
-                        jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
+                        jobId,
+                        MetricUtils.truncateOperatorName(
+                                TEST_LONG_SINK_NAME + ": " + 
ConfigConstants.WRITER_NAME));
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;
@@ -160,7 +163,7 @@ public class SinkMetricsITCase extends TestLogger {
 
         // Test operator I/O metrics are reused by task metrics
         List<TaskMetricGroup> taskMetricGroups =
-                reporter.findTaskMetricGroups(jobId, TEST_SINK_NAME);
+                reporter.findTaskMetricGroups(jobId, TEST_LONG_SINK_NAME);
         assertThat(taskMetricGroups, hasSize(parallelism));
 
         int subtaskWithTaskMetrics = 0;

Reply via email to