This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 683a245cf [Core] support scan metrics (#2170)
683a245cf is described below
commit 683a245cfd88a82a36ea9fad804aebe521b354d9
Author: GuojunLi <[email protected]>
AuthorDate: Wed Nov 1 18:41:04 2023 +0800
[Core] support scan metrics (#2170)
This closes #2170.
---
.../paimon/operation/AbstractFileStoreScan.java | 50 ++++++-
.../org/apache/paimon/operation/FileStoreScan.java | 3 +
.../paimon/operation/metrics/ScanMetrics.java | 95 +++++++++++++
.../apache/paimon/operation/metrics/ScanStats.java | 87 ++++++++++++
.../paimon/table/AbstractFileStoreTable.java | 3 +-
.../table/source/snapshot/SnapshotReader.java | 3 +
.../table/source/snapshot/SnapshotReaderImpl.java | 14 +-
.../apache/paimon/table/system/AuditLogTable.java | 7 +
.../paimon/operation/metrics/ScanMetricsTest.java | 153 +++++++++++++++++++++
9 files changed, 406 insertions(+), 9 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index dd638a187..8bd1fa2a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -27,6 +27,8 @@ import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.metrics.ScanMetrics;
+import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
@@ -49,6 +51,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -80,6 +83,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private ManifestCacheFilter manifestCacheFilter = null;
private final Integer scanManifestParallelism;
+ private ScanMetrics scanMetrics = null;
+
public AbstractFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketKeyFilter,
@@ -188,6 +193,12 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ @Override
+ public FileStoreScan withMetrics(ScanMetrics metrics) {
+ this.scanMetrics = metrics;
+ return this;
+ }
+
@Override
public Plan plan() {
@@ -223,6 +234,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Pair<Snapshot, List<ManifestEntry>> doPlan(
Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
+ long started = System.nanoTime();
List<ManifestFileMeta> manifests = specifiedManifests;
Snapshot snapshot = null;
if (manifests == null) {
@@ -237,19 +249,29 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
}
+ long startDataFiles =
+ manifests.stream().mapToLong(f -> f.numAddedFiles() +
f.numDeletedFiles()).sum();
+
+ AtomicLong cntEntries = new AtomicLong(0);
Iterable<ManifestEntry> entries =
ParallellyExecuteUtils.parallelismBatchIterable(
- files ->
- files.parallelStream()
- .filter(this::filterManifestFileMeta)
- .flatMap(m ->
readManifest.apply(m).stream())
- .filter(this::filterByStats)
- .collect(Collectors.toList()),
+ files -> {
+ List<ManifestEntry> entryList =
+ files.parallelStream()
+
.filter(this::filterManifestFileMeta)
+ .flatMap(m ->
readManifest.apply(m).stream())
+ .filter(this::filterByStats)
+ .collect(Collectors.toList());
+ cntEntries.getAndAdd(entryList.size());
+ return entryList;
+ },
manifests,
scanManifestParallelism);
List<ManifestEntry> files = new ArrayList<>();
- for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
+ Collection<ManifestEntry> mergedEntries =
ManifestEntry.mergeEntries(entries);
+ long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
+ for (ManifestEntry file : mergedEntries) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
@@ -278,6 +300,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
}
+ long afterBucketFilter = files.size();
+ long skippedByBucketAndLevelFilter = mergedEntries.size() -
files.size();
// We group files by bucket here, and filter them by the whole bucket
filter.
// Why do this: because in primary key table, we can't just filter the
value
// by the stat in files (see
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
@@ -296,6 +320,18 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
.flatMap(Collection::stream)
.collect(Collectors.toList());
+ long skippedByWholeBucketFiles = afterBucketFilter - files.size();
+ long scanDuration = (System.nanoTime() - started) / 1_000_000;
+ if (scanMetrics != null) {
+ scanMetrics.reportScan(
+ new ScanStats(
+ scanDuration,
+ manifests.size(),
+ skippedByPartitionAndStats,
+ skippedByBucketAndLevelFilter,
+ skippedByWholeBucketFiles,
+ files.size()));
+ }
return Pair.of(snapshot, files);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 9fce20710..0d11fa605 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.Filter;
@@ -62,6 +63,8 @@ public interface FileStoreScan {
FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
+ FileStoreScan withMetrics(ScanMetrics metrics);
+
/** Produce a {@link Plan}. */
Plan plan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
new file mode 100644
index 000000000..f67bde96c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+/** Metrics to measure scan operation. */
+public class ScanMetrics {
+ private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
+ @VisibleForTesting protected static final String GROUP_NAME = "scan";
+ private final MetricGroup metricGroup;
+
+ public ScanMetrics(MetricRegistry registry, String tableName) {
+ this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+ registerGenericScanMetrics();
+ }
+
+ @VisibleForTesting
+ public MetricGroup getMetricGroup() {
+ return metricGroup;
+ }
+
+ private Histogram durationHistogram;
+
+ private ScanStats latestScan;
+
+ @VisibleForTesting static final String LAST_SCAN_DURATION =
"lastScanDuration";
+ @VisibleForTesting static final String SCAN_DURATION = "scanDuration";
+ @VisibleForTesting static final String LAST_SCANNED_MANIFESTS =
"lastScannedManifests";
+
+ @VisibleForTesting
+ static final String LAST_SKIPPED_BY_PARTITION_AND_STATS =
"lastSkippedByPartitionAndStats";
+
+ @VisibleForTesting
+ static final String LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER =
+ "lastSkippedByBucketAndLevelFilter";
+
+ @VisibleForTesting
+ static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER =
+ "lastSkippedByWholeBucketFilesFilter";
+
+ @VisibleForTesting
+ static final String LAST_SCAN_SKIPPED_TABLE_FILES =
"lastScanSkippedTableFiles";
+
+ @VisibleForTesting
+ static final String LAST_SCAN_RESULTED_TABLE_FILES =
"lastScanResultedTableFiles";
+
+ private void registerGenericScanMetrics() {
+ metricGroup.gauge(
+ LAST_SCAN_DURATION, () -> latestScan == null ? 0L :
latestScan.getDuration());
+ durationHistogram = metricGroup.histogram(SCAN_DURATION,
HISTOGRAM_WINDOW_SIZE);
+ metricGroup.gauge(
+ LAST_SCANNED_MANIFESTS,
+ () -> latestScan == null ? 0L :
latestScan.getScannedManifests());
+ metricGroup.gauge(
+ LAST_SKIPPED_BY_PARTITION_AND_STATS,
+ () -> latestScan == null ? 0L :
latestScan.getSkippedByPartitionAndStats());
+ metricGroup.gauge(
+ LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER,
+ () -> latestScan == null ? 0L :
latestScan.getSkippedByBucketAndLevelFilter());
+ metricGroup.gauge(
+ LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER,
+ () -> latestScan == null ? 0L :
latestScan.getSkippedByWholeBucketFiles());
+ metricGroup.gauge(
+ LAST_SCAN_SKIPPED_TABLE_FILES,
+ () -> latestScan == null ? 0L :
latestScan.getSkippedTableFiles());
+ metricGroup.gauge(
+ LAST_SCAN_RESULTED_TABLE_FILES,
+ () -> latestScan == null ? 0L :
latestScan.getResultedTableFiles());
+ }
+
+ public void reportScan(ScanStats scanStats) {
+ latestScan = scanStats;
+ durationHistogram.update(scanStats.getDuration());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
new file mode 100644
index 000000000..b94643459
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+/** Statistics for a scan operation. */
+public class ScanStats {
+ private final long duration;
+ private final long scannedManifests;
+ private final long skippedByPartitionAndStats;
+ private final long skippedByBucketAndLevelFilter;
+
+ private final long skippedByWholeBucketFiles;
+ private final long skippedTableFiles;
+ private final long resultedTableFiles;
+
+ public ScanStats(
+ long duration,
+ long scannedManifests,
+ long skippedByPartitionAndStats,
+ long skippedByBucketAndLevelFilter,
+ long skippedByWholeBucketFiles,
+ long resultedTableFiles) {
+ this.duration = duration;
+ this.scannedManifests = scannedManifests;
+ this.skippedByPartitionAndStats = skippedByPartitionAndStats;
+ this.skippedByBucketAndLevelFilter = skippedByBucketAndLevelFilter;
+ this.skippedByWholeBucketFiles = skippedByWholeBucketFiles;
+ this.skippedTableFiles =
+ skippedByPartitionAndStats
+ + skippedByBucketAndLevelFilter
+ + skippedByWholeBucketFiles;
+ this.resultedTableFiles = resultedTableFiles;
+ }
+
+ @VisibleForTesting
+ protected long getScannedManifests() {
+ return scannedManifests;
+ }
+
+ @VisibleForTesting
+ protected long getSkippedTableFiles() {
+ return skippedTableFiles;
+ }
+
+ @VisibleForTesting
+ protected long getResultedTableFiles() {
+ return resultedTableFiles;
+ }
+
+ @VisibleForTesting
+ protected long getSkippedByPartitionAndStats() {
+ return skippedByPartitionAndStats;
+ }
+
+ @VisibleForTesting
+ protected long getSkippedByBucketAndLevelFilter() {
+ return skippedByBucketAndLevelFilter;
+ }
+
+ @VisibleForTesting
+ protected long getSkippedByWholeBucketFiles() {
+ return skippedByWholeBucketFiles;
+ }
+
+ @VisibleForTesting
+ protected long getDuration() {
+ return duration;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 596b62109..305a63b5f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -135,7 +135,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
snapshotManager(),
splitGenerator(),
nonPartitionFilterConsumer(),
- DefaultValueAssigner.create(tableSchema));
+ DefaultValueAssigner.create(tableSchema),
+ name());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index d53a7d2e3..ab55deba1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
@@ -57,6 +58,8 @@ public interface SnapshotReader {
SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
+ SnapshotReader withMetricRegistry(MetricRegistry registry);
+
/** Get splits plan from snapshot. */
Plan read();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 70673ca6d..aeb1143c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,8 +28,10 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.TableSchema;
@@ -72,6 +74,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
private ScanMode scanMode = ScanMode.ALL;
private RecordComparator lazyPartitionComparator;
+ private final String tableName;
+
public SnapshotReaderImpl(
FileStoreScan scan,
TableSchema tableSchema,
@@ -79,7 +83,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
SnapshotManager snapshotManager,
SplitGenerator splitGenerator,
BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
- DefaultValueAssigner defaultValueAssigner) {
+ DefaultValueAssigner defaultValueAssigner,
+ String tableName) {
+ this.tableName = tableName;
this.scan = scan;
this.tableSchema = tableSchema;
this.options = options;
@@ -173,6 +179,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withMetricRegistry(MetricRegistry registry) {
+ scan.withMetrics(new ScanMetrics(registry, tableName));
+ return this;
+ }
+
/** Get splits from {@link FileKind#ADD} files. */
@Override
public Plan read() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index c26dd7601..fb7bfa131 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -247,6 +248,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withMetricRegistry(MetricRegistry registry) {
+ snapshotReader.withMetricRegistry(registry);
+ return this;
+ }
+
@Override
public Plan read() {
return snapshotReader.read();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
new file mode 100644
index 000000000..5b41d0258
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistryImpl;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.offset;
+
+/** Tests for {@link ScanMetrics}. */
+public class ScanMetricsTest {
+ private static final String TABLE_NAME = "myTable";
+
+ /** Tests the registration of the commit metrics. */
+ @Test
+ public void testGenericMetricsRegistration() {
+ ScanMetrics scanMetrics = getScanMetrics();
+ MetricGroup metricGroup = scanMetrics.getMetricGroup();
+
assertThat(metricGroup.getGroupName()).isEqualTo(ScanMetrics.GROUP_NAME);
+ Map<String, Metric> registeredMetrics = metricGroup.getMetrics();
+ assertThat(registeredMetrics.keySet())
+ .containsExactlyInAnyOrder(
+ ScanMetrics.LAST_SCAN_DURATION,
+ ScanMetrics.SCAN_DURATION,
+ ScanMetrics.LAST_SCANNED_MANIFESTS,
+ ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
+ ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
+ ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS,
+ ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER,
+ ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
+ }
+
+ /** Tests that the metrics are updated properly. */
+ @Test
+ public void testMetricsAreUpdated() {
+ ScanMetrics scanMetrics = getScanMetrics();
+ Map<String, Metric> registeredGenericMetrics =
scanMetrics.getMetricGroup().getMetrics();
+
+ // Check initial values
+ Gauge<Long> lastScanDuration =
+ (Gauge<Long>)
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_DURATION);
+ Histogram scanDuration =
+ (Histogram)
registeredGenericMetrics.get(ScanMetrics.SCAN_DURATION);
+ Gauge<Long> lastScannedManifests =
+ (Gauge<Long>)
registeredGenericMetrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS);
+ Gauge<Long> lastSkippedByPartitionAndStats =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS);
+ Gauge<Long> lastSkippedByBucketAndLevelFilter =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER);
+ Gauge<Long> lastSkippedByWholeBucketFilesFilter =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
+ Gauge<Long> lastScanSkippedTableFiles =
+ (Gauge<Long>)
+
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES);
+ Gauge<Long> lastScanResultedTableFiles =
+ (Gauge<Long>)
+
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES);
+
+ assertThat(lastScanDuration.getValue()).isEqualTo(0);
+ assertThat(scanDuration.getCount()).isEqualTo(0);
+ assertThat(scanDuration.getStatistics().size()).isEqualTo(0);
+ assertThat(lastScannedManifests.getValue()).isEqualTo(0);
+ assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(0);
+ assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(0);
+
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(0);
+ assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(0);
+ assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(0);
+
+ // report once
+ reportOnce(scanMetrics);
+
+ // generic metrics value updated
+ assertThat(lastScanDuration.getValue()).isEqualTo(200);
+ assertThat(scanDuration.getCount()).isEqualTo(1);
+ assertThat(scanDuration.getStatistics().size()).isEqualTo(1);
+
assertThat(scanDuration.getStatistics().getValues()[0]).isEqualTo(200L);
+ assertThat(scanDuration.getStatistics().getMin()).isEqualTo(200);
+
assertThat(scanDuration.getStatistics().getQuantile(0.5)).isCloseTo(200.0,
offset(0.001));
+ assertThat(scanDuration.getStatistics().getMean()).isEqualTo(200);
+ assertThat(scanDuration.getStatistics().getMax()).isEqualTo(200);
+ assertThat(scanDuration.getStatistics().getStdDev()).isEqualTo(0);
+ assertThat(lastScannedManifests.getValue()).isEqualTo(20);
+ assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(25);
+ assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(40);
+
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(32);
+ assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(97);
+ assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(10);
+
+ // report again
+ reportAgain(scanMetrics);
+
+ // generic metrics value updated
+ assertThat(lastScanDuration.getValue()).isEqualTo(500);
+ assertThat(scanDuration.getCount()).isEqualTo(2);
+ assertThat(scanDuration.getStatistics().size()).isEqualTo(2);
+
assertThat(scanDuration.getStatistics().getValues()[1]).isEqualTo(500L);
+ assertThat(scanDuration.getStatistics().getMin()).isEqualTo(200);
+
assertThat(scanDuration.getStatistics().getQuantile(0.5)).isCloseTo(350.0,
offset(0.001));
+ assertThat(scanDuration.getStatistics().getMean()).isEqualTo(350);
+ assertThat(scanDuration.getStatistics().getMax()).isEqualTo(500);
+
assertThat(scanDuration.getStatistics().getStdDev()).isCloseTo(212.132,
offset(0.001));
+ assertThat(lastScannedManifests.getValue()).isEqualTo(22);
+ assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(30);
+ assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(42);
+
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(33);
+ assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(105);
+ assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(8);
+ }
+
+ private void reportOnce(ScanMetrics scanMetrics) {
+ ScanStats scanStats = new ScanStats(200, 20, 25, 40, 32, 10);
+ scanMetrics.reportScan(scanStats);
+ }
+
+ private void reportAgain(ScanMetrics scanMetrics) {
+ ScanStats scanStats = new ScanStats(500, 22, 30, 42, 33, 8);
+ scanMetrics.reportScan(scanStats);
+ }
+
+ private ScanMetrics getScanMetrics() {
+ return new ScanMetrics(new MetricRegistryImpl(), TABLE_NAME);
+ }
+}