This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c90b2081 [Flink] Register scan metrics to flink metric groups
7c90b2081 is described below

commit 7c90b2081158901be13e806df3139b1d88d0250b
Author: GuojunLi <[email protected]>
AuthorDate: Tue Nov 21 09:54:44 2023 +0800

    [Flink] Register scan metrics to flink metric groups
    
    This closes #2260.
---
 .../table/source/AbstractInnerTableScan.java       |   6 +
 .../apache/paimon/table/source/InnerTableScan.java |   6 +
 .../apache/paimon/table/system/AuditLogTable.java  |  12 ++
 .../flink/source/ContinuousFileStoreSource.java    |   6 +
 .../flink/source/LogHybridSourceFactory.java       |   7 +
 .../paimon/flink/source/StaticFileStoreSource.java |  15 +-
 .../flink/source/FileStoreSourceMetricsTest.java   | 188 +++++++++++++++++++++
 .../org/apache/paimon/flink/utils/MetricUtils.java |   5 +
 8 files changed, 242 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 7c6bdbcc2..277e01a2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
@@ -73,6 +74,11 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
         return this;
     }
 
+    public AbstractInnerTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
+        snapshotReader.withMetricRegistry(metricsRegistry);
+        return this;
+    }
+
     public CoreOptions options() {
         return options;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 3eca8442f..3433aad36 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 
 import java.util.Map;
@@ -30,4 +31,9 @@ public interface InnerTableScan extends TableScan {
     default InnerTableScan withPartitionFilter(Map<String, String> 
partitionSpec) {
         return this;
     }
+
+    default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
+        // do nothing, should implement this if need
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 7bc7e7dd2..990c71257 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -304,6 +304,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
+            batchScan.withMetricsRegistry(metricsRegistry);
+            return this;
+        }
+
         @Override
         public Plan plan() {
             return batchScan.plan();
@@ -370,6 +376,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
             streamScan.notifyCheckpointComplete(nextSnapshot);
         }
+
+        @Override
+        public InnerStreamTableScan withMetricsRegistry(MetricRegistry 
metricsRegistry) {
+            streamScan.withMetricsRegistry(metricsRegistry);
+            return this;
+        }
     }
 
     private class AuditLogRead implements InnerTableRead {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index aa08a1dfd..98c1e6e1b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -19,7 +19,9 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.StreamTableScan;
 
@@ -73,6 +75,10 @@ public class ContinuousFileStoreSource extends FlinkSource {
             splits = checkpoint.splits();
         }
         StreamTableScan scan = readBuilder.newStreamScan();
+        if (context.metricGroup() != null) {
+            ((InnerStreamTableScan) scan)
+                    .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
+        }
         scan.restore(nextSnapshotId);
         return buildEnumerator(context, splits, nextSnapshotId, scan);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index ab9cf5ccf..7f6c7ac0d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -21,10 +21,12 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.utils.SnapshotManager;
@@ -113,6 +115,11 @@ public class LogHybridSourceFactory
                 FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
                 // get snapshot id and splits from scan
                 StreamTableScan scan = readBuilder.newStreamScan();
+                // register scan metrics
+                if (context.metricGroup() != null) {
+                    ((InnerStreamTableScan) scan)
+                            .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
+                }
                 splits = splitGenerator.createSplits(scan.plan());
                 Long nextSnapshotId = scan.checkpoint();
                 if (nextSnapshotId != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 10519baeb..af425aab5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
 import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
 import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -75,16 +78,22 @@ public class StaticFileStoreSource extends FlinkSource {
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         Collection<FileStoreSourceSplit> splits =
-                checkpoint == null ? getSplits() : checkpoint.splits();
+                checkpoint == null ? getSplits(context) : checkpoint.splits();
         SplitAssigner splitAssigner =
                 createSplitAssigner(context, splitBatchSize, splitAssignMode, 
splits);
         return new StaticFileStoreSplitEnumerator(
                 context, null, splitAssigner, dynamicPartitionFilteringInfo);
     }
 
-    private List<FileStoreSourceSplit> getSplits() {
+    private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext 
context) {
         FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
-        return splitGenerator.createSplits(readBuilder.newScan().plan());
+        TableScan scan = readBuilder.newScan();
+        // register scan metrics
+        if (context.metricGroup() != null) {
+            ((InnerTableScan) scan)
+                    .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
+        }
+        return splitGenerator.createSplits(scan.plan());
     }
 
     public static SplitAssigner createSplitAssigner(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
new file mode 100644
index 000000000..27039a6ee
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.InnerTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import 
org.apache.flink.runtime.metrics.groups.InternalOperatorCoordinatorMetricGroup;
+import 
org.apache.flink.runtime.metrics.groups.InternalSplitEnumeratorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for file store sources with metrics. */
+public class FileStoreSourceMetricsTest {
+    private FileStoreTable table;
+    private TestingSplitEnumeratorContextWithRegisteringGroup context;
+    private MetricGroup scanMetricGroup;
+
+    @BeforeEach
+    public void before(@TempDir java.nio.file.Path path) throws Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(path.toString());
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+        TableSchema tableSchema =
+                schemaManager.createTable(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT())
+                                .column("b", DataTypes.BIGINT())
+                                .build());
+        table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+        context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
+        scanMetricGroup =
+                context.metricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("scan");
+    }
+
+    @Test
+    public void staticFileStoreSourceScanMetricsTest() throws Exception {
+        writeOnce();
+        StaticFileStoreSource staticFileStoreSource =
+                new StaticFileStoreSource(
+                        table.newReadBuilder(),
+                        null,
+                        1,
+                        FlinkConnectorOptions.SplitAssignMode.FAIR);
+        staticFileStoreSource.restoreEnumerator(context, null);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScannedManifests").getValue())
+                .isEqualTo(1L);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScanResultedTableFiles").getValue())
+                .isEqualTo(1L);
+    }
+
+    @Test
+    public void continuousFileStoreSourceScanMetricsTest() throws Exception {
+        writeOnce();
+        ContinuousFileStoreSource continuousFileStoreSource =
+                new ContinuousFileStoreSource(table.newReadBuilder(), 
table.options(), null);
+        ContinuousFileSplitEnumerator enumerator =
+                (ContinuousFileSplitEnumerator)
+                        continuousFileStoreSource.restoreEnumerator(context, 
null);
+        enumerator.scanNextSnapshot();
+        assertThat(MetricUtils.getHistogram(scanMetricGroup, 
"scanDuration").getCount())
+                .isEqualTo(1);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScannedManifests").getValue())
+                .isEqualTo(1L);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScanResultedTableFiles").getValue())
+                .isEqualTo(1L);
+
+        writeAgain();
+        enumerator.scanNextSnapshot();
+        assertThat(MetricUtils.getHistogram(scanMetricGroup, 
"scanDuration").getCount())
+                .isEqualTo(2);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScannedManifests").getValue())
+                .isEqualTo(1L);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScanResultedTableFiles").getValue())
+                .isEqualTo(1L);
+    }
+
+    @Test
+    public void logHybridFileStoreSourceScanMetricsTest() throws Exception {
+        writeOnce();
+        FlinkSource logHybridFileStoreSource =
+                LogHybridSourceFactory.buildHybridFirstSource(table, null, 
null);
+        logHybridFileStoreSource.restoreEnumerator(context, null);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScannedManifests").getValue())
+                .isEqualTo(1L);
+        assertThat(MetricUtils.getGauge(scanMetricGroup, 
"lastScanResultedTableFiles").getValue())
+                .isEqualTo(1L);
+    }
+
+    private void writeOnce() throws Exception {
+        InnerTableWrite writer = table.newWrite("test");
+        TableCommitImpl commit = table.newCommit("test");
+        writer.write(GenericRow.of(1, 2L));
+        writer.write(GenericRow.of(3, 4L));
+        writer.write(GenericRow.of(5, 6L));
+        writer.write(GenericRow.of(7, 8L));
+        writer.write(GenericRow.of(9, 10L));
+        commit.commit(writer.prepareCommit());
+
+        commit.close();
+        writer.close();
+    }
+
+    private void writeAgain() throws Exception {
+        InnerTableWrite writer = table.newWrite("test");
+        TableCommitImpl commit = table.newCommit("test");
+        writer.write(GenericRow.of(10, 2L));
+        writer.write(GenericRow.of(13, 24L));
+        writer.write(GenericRow.of(15, 26L));
+        writer.write(GenericRow.of(17, 28L));
+        writer.write(GenericRow.of(19, 10L));
+        commit.commit(writer.prepareCommit());
+
+        commit.close();
+        writer.close();
+    }
+
+    private class TestingSplitEnumeratorContextWithRegisteringGroup
+            extends TestingSplitEnumeratorContext<FileStoreSourceSplit> {
+        private final SplitEnumeratorMetricGroup metricGroup;
+
+        public TestingSplitEnumeratorContextWithRegisteringGroup(int 
parallelism) {
+            super(parallelism);
+            final JobID jobId = new JobID();
+            final JobVertexID jobVertexId = new JobVertexID();
+            final OperatorID operatorId = new OperatorID();
+            final MetricRegistry registry = 
TestingMetricRegistry.builder().build();
+            JobManagerOperatorMetricGroup jmJobGroup =
+                    
JobManagerMetricGroup.createJobManagerMetricGroup(registry, "localhost")
+                            .addJob(jobId, "myJobName")
+                            .getOrAddOperator(jobVertexId, "taskName", 
operatorId, "opName");
+            InternalOperatorCoordinatorMetricGroup 
operatorCoordinatorMetricGroup =
+                    new InternalOperatorCoordinatorMetricGroup(jmJobGroup);
+            InternalSplitEnumeratorMetricGroup splitEnumeratorMetricGroup =
+                    new 
InternalSplitEnumeratorMetricGroup(operatorCoordinatorMetricGroup);
+            this.metricGroup = splitEnumeratorMetricGroup;
+        }
+
+        @Override
+        public SplitEnumeratorMetricGroup metricGroup() {
+            return this.metricGroup;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
index 93451a1f7..a8ed37cc0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.utils;
 
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
@@ -33,6 +34,10 @@ public class MetricUtils {
         return (Gauge<?>) getMetric(group, metricName);
     }
 
+    public static Histogram getHistogram(MetricGroup group, String metricName) 
{
+        return (Histogram) getMetric(group, metricName);
+    }
+
     @SuppressWarnings("unchecked")
     private static Metric getMetric(MetricGroup group, String metricName) {
         try {

Reply via email to