This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91c2b8d56bae03c2ce4a50b5c014cea842df9f74
Author: Jing Ge <[email protected]>
AuthorDate: Thu Feb 17 23:33:35 2022 +0100

    [FLINK-26126][metrics] Introduce new counter metrics for sending records by 
SinkWriter.
    
    We found that the new sink v2 interface will have a wrong numRecordsOut 
metric for the sink writers. We send a fixed number of records to the source, 
but the numRecordsOut of the sink continues to increase by the time.
    
    The problem is that both the SinkWriterOperator and the KafkaWriter are 
using the same counter metric for counting the outgoing records. Same records 
sent by the SinkWriterOperator to the post topology and written by the 
KafkaWriter to the downstream system will be count twice in the same counter 
metric.
---
 .../metrics/groups/SinkWriterMetricGroup.java      | 14 ++++++++++
 .../apache/flink/runtime/metrics/MetricNames.java  |  2 ++
 .../groups/InternalSinkWriterMetricGroup.java      | 14 ++++++++++
 .../testframe/testsuites/SinkTestSuiteBase.java    | 14 +++++++---
 .../test/streaming/runtime/SinkMetricsITCase.java  | 31 +++++++++++++---------
 5 files changed, 60 insertions(+), 15 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
index 6a2e576..22e7c77 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
@@ -32,6 +32,20 @@ public interface SinkWriterMetricGroup extends 
OperatorMetricGroup {
     Counter getNumRecordsOutErrorsCounter();
 
     /**
+     * The total number of records have been sent to the downstream system.
+     *
+     * <p>Note: this counter will count all records the SinkWriter sent. From 
SinkWirter's
+     * perspective, these records have been sent to the downstream system, but 
the downstream system
+     * may have issue to perform the persistence action within its scope. 
Therefore, this count may
+     * include the number of records that are failed to write by the 
downstream system, which should
+     * be counted by {@link #getNumRecordsOutErrorsCounter()}.
+     */
+    Counter getNumRecordsSendCounter();
+
+    /** The total number of output send bytes since the task started. */
+    Counter getNumBytesSendCounter();
+
+    /**
      * Sets an optional gauge for the time it takes to send the last record.
      *
      * <p>This metric is an instantaneous value recorded for the last 
processed record.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 81228b2..800cf2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -86,6 +86,8 @@ public class MetricNames {
     // FLIP-33 sink
     public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
     public static final String CURRENT_SEND_TIME = "currentSendTime";
+    public static final String NUM_RECORDS_SEND = "numRecordsSend";
+    public static final String NUM_BYTES_SEND = "numBytesSend";
 
     // FLIP-33 source
     public static final String NUM_RECORDS_IN_ERRORS = "numRecordsInErrors";
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
index 1ef995f..8bc99f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
@@ -35,12 +35,16 @@ public class InternalSinkWriterMetricGroup extends 
ProxyMetricGroup<MetricGroup>
         implements SinkWriterMetricGroup {
 
     private final Counter numRecordsOutErrors;
+    private final Counter numRecordsWritten;
+    private final Counter numBytesWritten;
     private final OperatorIOMetricGroup operatorIOMetricGroup;
 
     private InternalSinkWriterMetricGroup(
             MetricGroup parentMetricGroup, OperatorIOMetricGroup 
operatorIOMetricGroup) {
         super(parentMetricGroup);
         numRecordsOutErrors = 
parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+        numRecordsWritten = 
parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND);
+        numBytesWritten = 
parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND);
         this.operatorIOMetricGroup = operatorIOMetricGroup;
     }
 
@@ -72,6 +76,16 @@ public class InternalSinkWriterMetricGroup extends 
ProxyMetricGroup<MetricGroup>
     }
 
     @Override
+    public Counter getNumRecordsSendCounter() {
+        return numRecordsWritten;
+    }
+
+    @Override
+    public Counter getNumBytesSendCounter() {
+        return numBytesWritten;
+    }
+
+    @Override
     public void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge) {
         parentMetricGroup.gauge(MetricNames.CURRENT_SEND_TIME, 
currentSendTimeGauge);
     }
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
index 2c13e83..e958e70 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -405,6 +405,7 @@ public abstract class SinkTestSuiteBase<T extends 
Comparable<T>> {
                                     externalContext,
                                     jobClient.getJobID(),
                                     sinkName,
+                                    MetricNames.NUM_RECORDS_SEND,
                                     testRecords.size());
                         } catch (Exception e) {
                             // skip failed assert try
@@ -531,16 +532,23 @@ public abstract class SinkTestSuiteBase<T extends 
Comparable<T>> {
             DataStreamSinkExternalContext<T> context,
             JobID jobId,
             String sinkName,
-            long allRecordSize)
+            String metricsName,
+            long expectedSize)
             throws Exception {
         double sumNumRecordsOut =
                 metricQuerier.getAggregatedMetricsByRestAPI(
                         testEnv.getRestEndpoint(),
                         jobId,
                         sinkName,
-                        MetricNames.IO_NUM_RECORDS_OUT,
+                        metricsName,
                         getSinkMetricFilter(context));
-        return Precision.equals(allRecordSize, sumNumRecordsOut);
+
+        if (Precision.equals(expectedSize, sumNumRecordsOut)) {
+            return true;
+        } else {
+            LOG.info("expected:<{}> but was <{}>({})", expectedSize, 
sumNumRecordsOut, metricsName);
+            return false;
+        }
     }
 
     /** Sort the list. */
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index e2e4203..c39abe0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -53,7 +52,12 @@ import static org.hamcrest.Matchers.hasSize;
 
 /** Tests whether all provided metrics of a {@link Sink} are of the expected 
values (FLIP-33). */
 public class SinkMetricsITCase extends TestLogger {
+
+    private static final String TEST_SINK_NAME = "MetricTestSink";
+    // please refer to SinkTransformationTranslator#WRITER_NAME
+    private static final String DEFAULT_WRITER_NAME = "Writer";
     private static final int DEFAULT_PARALLELISM = 4;
+
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
     private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
 
@@ -96,8 +100,12 @@ public class SinkMetricsITCase extends TestLogger {
                             }
                             return i;
                         })
-                .sinkTo(TestSink.newBuilder().setWriter(new 
MetricWriter()).build())
-                .name("MetricTestSink");
+                .sinkTo(
+                        TestSink.newBuilder()
+                                .setDefaultCommitter()
+                                .setWriter(new MetricWriter())
+                                .build())
+                .name(TEST_SINK_NAME);
         JobClient jobClient = env.executeAsync();
         final JobID jobId = jobClient.getJobID();
 
@@ -115,23 +123,24 @@ public class SinkMetricsITCase extends TestLogger {
     private void assertSinkMetrics(
             JobID jobId, long processedRecordsPerSubtask, int parallelism, int 
numSplits) {
         List<OperatorMetricGroup> groups =
-                reporter.findOperatorMetricGroups(jobId, "MetricTestSink");
+                reporter.findOperatorMetricGroups(
+                        jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;
         for (OperatorMetricGroup group : groups) {
             Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
             // there are only 2 splits assigned; so two groups will not update 
metrics
-            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() 
== 0) {
+            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() 
!= 0) {
                 continue;
             }
             subtaskWithMetrics++;
-            // I/O metrics
+            // SinkWriterMetricGroup metrics
             assertThat(
-                    group.getIOMetricGroup().getNumRecordsOutCounter(),
+                    metrics.get(MetricNames.NUM_RECORDS_SEND),
                     isCounter(equalTo(processedRecordsPerSubtask)));
             assertThat(
-                    group.getIOMetricGroup().getNumBytesOutCounter(),
+                    metrics.get(MetricNames.NUM_BYTES_SEND),
                     isCounter(
                             equalTo(
                                     processedRecordsPerSubtask
@@ -156,12 +165,10 @@ public class SinkMetricsITCase extends TestLogger {
         static final long RECORD_SIZE_IN_BYTES = 10;
         private SinkWriterMetricGroup metricGroup;
         private long sendTime;
-        private Counter recordsOutCounter;
 
         @Override
         public void init(Sink.InitContext context) {
             this.metricGroup = context.metricGroup();
-            this.recordsOutCounter = 
metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
             metricGroup.setCurrentSendTimeGauge(() -> sendTime);
         }
 
@@ -169,11 +176,11 @@ public class SinkMetricsITCase extends TestLogger {
         public void write(Long element, Context context) {
             super.write(element, context);
             sendTime = element * BASE_SEND_TIME;
-            recordsOutCounter.inc();
+            metricGroup.getNumRecordsSendCounter().inc();
             if (element % 2 == 0) {
                 metricGroup.getNumRecordsOutErrorsCounter().inc();
             }
-            
metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
+            metricGroup.getNumBytesSendCounter().inc(RECORD_SIZE_IN_BYTES);
         }
     }
 }

Reply via email to