This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7c90b2081 [Flink] Register scan metrics to flink metric groups
7c90b2081 is described below
commit 7c90b2081158901be13e806df3139b1d88d0250b
Author: GuojunLi <[email protected]>
AuthorDate: Tue Nov 21 09:54:44 2023 +0800
[Flink] Register scan metrics to flink metric groups
This closes #2260.
---
.../table/source/AbstractInnerTableScan.java | 6 +
.../apache/paimon/table/source/InnerTableScan.java | 6 +
.../apache/paimon/table/system/AuditLogTable.java | 12 ++
.../flink/source/ContinuousFileStoreSource.java | 6 +
.../flink/source/LogHybridSourceFactory.java | 7 +
.../paimon/flink/source/StaticFileStoreSource.java | 15 +-
.../flink/source/FileStoreSourceMetricsTest.java | 188 +++++++++++++++++++++
.../org/apache/paimon/flink/utils/MetricUtils.java | 5 +
8 files changed, 242 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 7c6bdbcc2..277e01a2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
@@ -73,6 +74,11 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
return this;
}
+ public AbstractInnerTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
+ snapshotReader.withMetricRegistry(metricsRegistry);
+ return this;
+ }
+
public CoreOptions options() {
return options;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 3eca8442f..3433aad36 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import java.util.Map;
@@ -30,4 +31,9 @@ public interface InnerTableScan extends TableScan {
default InnerTableScan withPartitionFilter(Map<String, String>
partitionSpec) {
return this;
}
+
+ default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
+ // do nothing, should implement this if need
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 7bc7e7dd2..990c71257 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -304,6 +304,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public InnerTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
+ batchScan.withMetricsRegistry(metricsRegistry);
+ return this;
+ }
+
@Override
public Plan plan() {
return batchScan.plan();
@@ -370,6 +376,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {
streamScan.notifyCheckpointComplete(nextSnapshot);
}
+
+ @Override
+ public InnerStreamTableScan withMetricsRegistry(MetricRegistry
metricsRegistry) {
+ streamScan.withMetricsRegistry(metricsRegistry);
+ return this;
+ }
}
private class AuditLogRead implements InnerTableRead {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index aa08a1dfd..98c1e6e1b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -19,7 +19,9 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
@@ -73,6 +75,10 @@ public class ContinuousFileStoreSource extends FlinkSource {
splits = checkpoint.splits();
}
StreamTableScan scan = readBuilder.newStreamScan();
+ if (context.metricGroup() != null) {
+ ((InnerStreamTableScan) scan)
+ .withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
+ }
scan.restore(nextSnapshotId);
return buildEnumerator(context, splits, nextSnapshotId, scan);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index ab9cf5ccf..7f6c7ac0d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -21,10 +21,12 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.log.LogSourceProvider;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.utils.SnapshotManager;
@@ -113,6 +115,11 @@ public class LogHybridSourceFactory
FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
// get snapshot id and splits from scan
StreamTableScan scan = readBuilder.newStreamScan();
+ // register scan metrics
+ if (context.metricGroup() != null) {
+ ((InnerStreamTableScan) scan)
+ .withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
+ }
splits = splitGenerator.createSplits(scan.plan());
Long nextSnapshotId = scan.checkpoint();
if (nextSnapshotId != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 10519baeb..af425aab5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -18,10 +18,13 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -75,16 +78,22 @@ public class StaticFileStoreSource extends FlinkSource {
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
Collection<FileStoreSourceSplit> splits =
- checkpoint == null ? getSplits() : checkpoint.splits();
+ checkpoint == null ? getSplits(context) : checkpoint.splits();
SplitAssigner splitAssigner =
createSplitAssigner(context, splitBatchSize, splitAssignMode,
splits);
return new StaticFileStoreSplitEnumerator(
context, null, splitAssigner, dynamicPartitionFilteringInfo);
}
- private List<FileStoreSourceSplit> getSplits() {
+ private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext
context) {
FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
- return splitGenerator.createSplits(readBuilder.newScan().plan());
+ TableScan scan = readBuilder.newScan();
+ // register scan metrics
+ if (context.metricGroup() != null) {
+ ((InnerTableScan) scan)
+ .withMetricsRegistry(new
FlinkMetricRegistry(context.metricGroup()));
+ }
+ return splitGenerator.createSplits(scan.plan());
}
public static SplitAssigner createSplitAssigner(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
new file mode 100644
index 000000000..27039a6ee
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.InnerTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.api.common.JobID;
+import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import
org.apache.flink.runtime.metrics.groups.InternalOperatorCoordinatorMetricGroup;
+import
org.apache.flink.runtime.metrics.groups.InternalSplitEnumeratorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for file store sources with metrics. */
+public class FileStoreSourceMetricsTest {
+ private FileStoreTable table;
+ private TestingSplitEnumeratorContextWithRegisteringGroup context;
+ private MetricGroup scanMetricGroup;
+
+ @BeforeEach
+ public void before(@TempDir java.nio.file.Path path) throws Exception {
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(path.toString());
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+ TableSchema tableSchema =
+ schemaManager.createTable(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.BIGINT())
+ .build());
+ table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
+ scanMetricGroup =
+ context.metricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("scan");
+ }
+
+ @Test
+ public void staticFileStoreSourceScanMetricsTest() throws Exception {
+ writeOnce();
+ StaticFileStoreSource staticFileStoreSource =
+ new StaticFileStoreSource(
+ table.newReadBuilder(),
+ null,
+ 1,
+ FlinkConnectorOptions.SplitAssignMode.FAIR);
+ staticFileStoreSource.restoreEnumerator(context, null);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ .isEqualTo(1L);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ .isEqualTo(1L);
+ }
+
+ @Test
+ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
+ writeOnce();
+ ContinuousFileStoreSource continuousFileStoreSource =
+ new ContinuousFileStoreSource(table.newReadBuilder(),
table.options(), null);
+ ContinuousFileSplitEnumerator enumerator =
+ (ContinuousFileSplitEnumerator)
+ continuousFileStoreSource.restoreEnumerator(context,
null);
+ enumerator.scanNextSnapshot();
+ assertThat(MetricUtils.getHistogram(scanMetricGroup,
"scanDuration").getCount())
+ .isEqualTo(1);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ .isEqualTo(1L);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ .isEqualTo(1L);
+
+ writeAgain();
+ enumerator.scanNextSnapshot();
+ assertThat(MetricUtils.getHistogram(scanMetricGroup,
"scanDuration").getCount())
+ .isEqualTo(2);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ .isEqualTo(1L);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ .isEqualTo(1L);
+ }
+
+ @Test
+ public void logHybridFileStoreSourceScanMetricsTest() throws Exception {
+ writeOnce();
+ FlinkSource logHybridFileStoreSource =
+ LogHybridSourceFactory.buildHybridFirstSource(table, null,
null);
+ logHybridFileStoreSource.restoreEnumerator(context, null);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ .isEqualTo(1L);
+ assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ .isEqualTo(1L);
+ }
+
+ private void writeOnce() throws Exception {
+ InnerTableWrite writer = table.newWrite("test");
+ TableCommitImpl commit = table.newCommit("test");
+ writer.write(GenericRow.of(1, 2L));
+ writer.write(GenericRow.of(3, 4L));
+ writer.write(GenericRow.of(5, 6L));
+ writer.write(GenericRow.of(7, 8L));
+ writer.write(GenericRow.of(9, 10L));
+ commit.commit(writer.prepareCommit());
+
+ commit.close();
+ writer.close();
+ }
+
+ private void writeAgain() throws Exception {
+ InnerTableWrite writer = table.newWrite("test");
+ TableCommitImpl commit = table.newCommit("test");
+ writer.write(GenericRow.of(10, 2L));
+ writer.write(GenericRow.of(13, 24L));
+ writer.write(GenericRow.of(15, 26L));
+ writer.write(GenericRow.of(17, 28L));
+ writer.write(GenericRow.of(19, 10L));
+ commit.commit(writer.prepareCommit());
+
+ commit.close();
+ writer.close();
+ }
+
+ private class TestingSplitEnumeratorContextWithRegisteringGroup
+ extends TestingSplitEnumeratorContext<FileStoreSourceSplit> {
+ private final SplitEnumeratorMetricGroup metricGroup;
+
+ public TestingSplitEnumeratorContextWithRegisteringGroup(int
parallelism) {
+ super(parallelism);
+ final JobID jobId = new JobID();
+ final JobVertexID jobVertexId = new JobVertexID();
+ final OperatorID operatorId = new OperatorID();
+ final MetricRegistry registry =
TestingMetricRegistry.builder().build();
+ JobManagerOperatorMetricGroup jmJobGroup =
+
JobManagerMetricGroup.createJobManagerMetricGroup(registry, "localhost")
+ .addJob(jobId, "myJobName")
+ .getOrAddOperator(jobVertexId, "taskName",
operatorId, "opName");
+ InternalOperatorCoordinatorMetricGroup
operatorCoordinatorMetricGroup =
+ new InternalOperatorCoordinatorMetricGroup(jmJobGroup);
+ InternalSplitEnumeratorMetricGroup splitEnumeratorMetricGroup =
+ new
InternalSplitEnumeratorMetricGroup(operatorCoordinatorMetricGroup);
+ this.metricGroup = splitEnumeratorMetricGroup;
+ }
+
+ @Override
+ public SplitEnumeratorMetricGroup metricGroup() {
+ return this.metricGroup;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
index 93451a1f7..a8ed37cc0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.utils;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
@@ -33,6 +34,10 @@ public class MetricUtils {
return (Gauge<?>) getMetric(group, metricName);
}
+ public static Histogram getHistogram(MetricGroup group, String metricName)
{
+ return (Histogram) getMetric(group, metricName);
+ }
+
@SuppressWarnings("unchecked")
private static Metric getMetric(MetricGroup group, String metricName) {
try {