This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit c54d3b2d038143a5adcf8f22d901159eec3a4c2c
Author: GuojunLi <[email protected]>
AuthorDate: Mon Dec 11 10:35:11 2023 +0800

    [flink] Support source reader metrics when consumer-id is set (#2470)
---
 docs/content/maintenance/metrics.md                |  4 +++
 .../paimon/flink/source/operator/ReadOperator.java | 14 +++++++++-
 .../flink/source/operator/OperatorSourceTest.java  | 30 ++++++++++++++++++++++
 3 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/docs/content/maintenance/metrics.md 
b/docs/content/maintenance/metrics.md
index 6a8542f29..156b84785 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -359,6 +359,10 @@ When using Flink to read and write, Paimon has implemented 
some key standard Fli
     </tbody>
 </table>
 
+{{< hint info >}}
+Please note that if you specified `consumer-id` in your streaming query, the 
level of source metrics should turn into the reader operator, which is behind 
the `Monitor` operator.
+{{< /hint >}}
+
 #### Sink Metrics (Flink)
 
 <table class="table table-bordered">
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index 0a1b3c511..fc41635ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.source.operator;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
@@ -48,6 +50,8 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     private transient FlinkRowData reuseRow;
     private transient IOManager ioManager;
 
+    private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
+
     public ReadOperator(ReadBuilder readBuilder) {
         this.readBuilder = readBuilder;
     }
@@ -55,6 +59,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     @Override
     public void open() throws Exception {
         super.open();
+        this.sourceReaderMetrics = new 
FileStoreSourceReaderMetrics(getMetricGroup());
         this.ioManager =
                 IOManager.create(
                         getContainingTask()
@@ -68,8 +73,15 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
 
     @Override
     public void processElement(StreamRecord<Split> record) throws Exception {
+        Split split = record.getValue();
+        // update metric when reading a new split
+        long eventTime =
+                ((DataSplit) split)
+                        .getLatestFileCreationEpochMillis()
+                        .orElse(FileStoreSourceReaderMetrics.UNDEFINED);
+        sourceReaderMetrics.recordSnapshotUpdate(eventTime);
         try (CloseableIterator<InternalRow> iterator =
-                read.createReader(record.getValue()).toCloseableIterator()) {
+                read.createReader(split).toCloseableIterator()) {
             while (iterator.hasNext()) {
                 reuseRow.replace(iterator.next());
                 output.collect(reuseRecord);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index d91fe9b49..044fd2182 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.utils.MetricUtils;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -32,6 +33,7 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataTypes;
 
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -177,6 +179,34 @@ public class OperatorSourceTest {
                         new StreamRecord<>(GenericRowData.of(2, 2, 2)));
     }
 
+    @Test
+    public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
+        ReadOperator readOperator = new ReadOperator(table.newReadBuilder());
+        OneInputStreamOperatorTestHarness<Split, RowData> harness =
+                new OneInputStreamOperatorTestHarness<>(readOperator);
+        harness.setup(
+                InternalSerializers.create(
+                        RowType.of(new IntType(), new IntType(), new 
IntType())));
+        writeToTable(1, 1, 1);
+        writeToTable(2, 2, 2);
+        List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+        assertThat(splits.size()).isGreaterThan(0);
+        MetricGroup readerOperatorMetricGroup = readOperator.getMetricGroup();
+        harness.open();
+        assertThat(
+                        MetricUtils.getGauge(readerOperatorMetricGroup, 
"currentFetchEventTimeLag")
+                                .getValue())
+                .isEqualTo(-1L);
+        harness.processElement(new StreamRecord<>(splits.get(0)));
+        assertThat(
+                        (Long)
+                                MetricUtils.getGauge(
+                                                readerOperatorMetricGroup,
+                                                "currentFetchEventTimeLag")
+                                        .getValue())
+                .isGreaterThan(0);
+    }
+
     private <T> T testReadSplit(
             MonitorFunction function,
             SupplierWithException<T, Exception> beforeClose,

Reply via email to