This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit c54d3b2d038143a5adcf8f22d901159eec3a4c2c Author: GuojunLi <[email protected]> AuthorDate: Mon Dec 11 10:35:11 2023 +0800 [flink] Support source reader metrics when consumer-id is set (#2470) --- docs/content/maintenance/metrics.md | 4 +++ .../paimon/flink/source/operator/ReadOperator.java | 14 +++++++++- .../flink/source/operator/OperatorSourceTest.java | 30 ++++++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md index 6a8542f29..156b84785 100644 --- a/docs/content/maintenance/metrics.md +++ b/docs/content/maintenance/metrics.md @@ -359,6 +359,10 @@ When using Flink to read and write, Paimon has implemented some key standard Fli </tbody> </table> +{{< hint info >}} +Please note that if you specified `consumer-id` in your streaming query, the level of source metrics should turn into the reader operator, which is behind the `Monitor` operator. +{{< /hint >}} + #### Sink Metrics (Flink) <table class="table table-bordered"> diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 0a1b3c511..fc41635ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -21,6 +21,8 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; @@ -48,6 +50,8 @@ public class ReadOperator extends AbstractStreamOperator<RowData> private transient FlinkRowData reuseRow; private transient IOManager ioManager; + private transient FileStoreSourceReaderMetrics sourceReaderMetrics; + public ReadOperator(ReadBuilder readBuilder) { this.readBuilder = readBuilder; } @@ -55,6 +59,7 @@ public class ReadOperator extends AbstractStreamOperator<RowData> @Override public void open() throws Exception { super.open(); + this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup()); this.ioManager = IOManager.create( getContainingTask() @@ -68,8 +73,15 @@ public class ReadOperator extends AbstractStreamOperator<RowData> @Override public void processElement(StreamRecord<Split> record) throws Exception { + Split split = record.getValue(); + // update metric when reading a new split + long eventTime = + ((DataSplit) split) + .getLatestFileCreationEpochMillis() + .orElse(FileStoreSourceReaderMetrics.UNDEFINED); + sourceReaderMetrics.recordSnapshotUpdate(eventTime); try (CloseableIterator<InternalRow> iterator = - read.createReader(record.getValue()).toCloseableIterator()) { + read.createReader(split).toCloseableIterator()) { while (iterator.hasNext()) { reuseRow.replace(iterator.next()); output.collect(reuseRecord); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index d91fe9b49..044fd2182 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.utils.MetricUtils; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; @@ -32,6 +33,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataTypes; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; @@ -177,6 +179,34 @@ public class OperatorSourceTest { new StreamRecord<>(GenericRowData.of(2, 2, 2))); } + @Test + public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { + ReadOperator readOperator = new ReadOperator(table.newReadBuilder()); + OneInputStreamOperatorTestHarness<Split, RowData> harness = + new OneInputStreamOperatorTestHarness<>(readOperator); + harness.setup( + InternalSerializers.create( + RowType.of(new IntType(), new IntType(), new IntType()))); + writeToTable(1, 1, 1); + writeToTable(2, 2, 2); + List<Split> splits = table.newReadBuilder().newScan().plan().splits(); + assertThat(splits.size()).isGreaterThan(0); + MetricGroup readerOperatorMetricGroup = readOperator.getMetricGroup(); + harness.open(); + assertThat( + MetricUtils.getGauge(readerOperatorMetricGroup, "currentFetchEventTimeLag") + .getValue()) + .isEqualTo(-1L); + harness.processElement(new StreamRecord<>(splits.get(0))); + assertThat( + (Long) + MetricUtils.getGauge( + readerOperatorMetricGroup, + "currentFetchEventTimeLag") + .getValue()) + .isGreaterThan(0); + } + private <T> T testReadSplit( MonitorFunction function, SupplierWithException<T, Exception> beforeClose,
