This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 683a245cf [Core] support scan metrics (#2170)
683a245cf is described below

commit 683a245cfd88a82a36ea9fad804aebe521b354d9
Author: GuojunLi <[email protected]>
AuthorDate: Wed Nov 1 18:41:04 2023 +0800

    [Core] support scan metrics (#2170)
    
    This closes #2170.
---
 .../paimon/operation/AbstractFileStoreScan.java    |  50 ++++++-
 .../org/apache/paimon/operation/FileStoreScan.java |   3 +
 .../paimon/operation/metrics/ScanMetrics.java      |  95 +++++++++++++
 .../apache/paimon/operation/metrics/ScanStats.java |  87 ++++++++++++
 .../paimon/table/AbstractFileStoreTable.java       |   3 +-
 .../table/source/snapshot/SnapshotReader.java      |   3 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  14 +-
 .../apache/paimon/table/system/AuditLogTable.java  |   7 +
 .../paimon/operation/metrics/ScanMetricsTest.java  | 153 +++++++++++++++++++++
 9 files changed, 406 insertions(+), 9 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index dd638a187..8bd1fa2a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -27,6 +27,8 @@ import org.apache.paimon.manifest.ManifestEntrySerializer;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.metrics.ScanMetrics;
+import org.apache.paimon.operation.metrics.ScanStats;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
@@ -49,6 +51,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -80,6 +83,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private ManifestCacheFilter manifestCacheFilter = null;
     private final Integer scanManifestParallelism;
 
+    private ScanMetrics scanMetrics = null;
+
     public AbstractFileStoreScan(
             RowType partitionType,
             ScanBucketFilter bucketKeyFilter,
@@ -188,6 +193,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withMetrics(ScanMetrics metrics) {
+        this.scanMetrics = metrics;
+        return this;
+    }
+
     @Override
     public Plan plan() {
 
@@ -223,6 +234,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private Pair<Snapshot, List<ManifestEntry>> doPlan(
             Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
+        long started = System.nanoTime();
         List<ManifestFileMeta> manifests = specifiedManifests;
         Snapshot snapshot = null;
         if (manifests == null) {
@@ -237,19 +249,29 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             }
         }
 
+        long startDataFiles =
+                manifests.stream().mapToLong(f -> f.numAddedFiles() + 
f.numDeletedFiles()).sum();
+
+        AtomicLong cntEntries = new AtomicLong(0);
         Iterable<ManifestEntry> entries =
                 ParallellyExecuteUtils.parallelismBatchIterable(
-                        files ->
-                                files.parallelStream()
-                                        .filter(this::filterManifestFileMeta)
-                                        .flatMap(m -> 
readManifest.apply(m).stream())
-                                        .filter(this::filterByStats)
-                                        .collect(Collectors.toList()),
+                        files -> {
+                            List<ManifestEntry> entryList =
+                                    files.parallelStream()
+                                            
.filter(this::filterManifestFileMeta)
+                                            .flatMap(m -> 
readManifest.apply(m).stream())
+                                            .filter(this::filterByStats)
+                                            .collect(Collectors.toList());
+                            cntEntries.getAndAdd(entryList.size());
+                            return entryList;
+                        },
                         manifests,
                         scanManifestParallelism);
 
         List<ManifestEntry> files = new ArrayList<>();
-        for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
+        Collection<ManifestEntry> mergedEntries = 
ManifestEntry.mergeEntries(entries);
+        long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
+        for (ManifestEntry file : mergedEntries) {
             if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
                 String partInfo =
                         partitionType.getFieldCount() > 0
@@ -278,6 +300,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             }
         }
 
+        long afterBucketFilter = files.size();
+        long skippedByBucketAndLevelFilter = mergedEntries.size() - 
files.size();
         // We group files by bucket here, and filter them by the whole bucket 
filter.
         // Why do this: because in primary key table, we can't just filter the 
value
         // by the stat in files (see 
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
@@ -296,6 +320,18 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                         .flatMap(Collection::stream)
                         .collect(Collectors.toList());
 
+        long skippedByWholeBucketFiles = afterBucketFilter - files.size();
+        long scanDuration = (System.nanoTime() - started) / 1_000_000;
+        if (scanMetrics != null) {
+            scanMetrics.reportScan(
+                    new ScanStats(
+                            scanDuration,
+                            manifests.size(),
+                            skippedByPartitionAndStats,
+                            skippedByBucketAndLevelFilter,
+                            skippedByWholeBucketFiles,
+                            files.size()));
+        }
         return Pair.of(snapshot, files);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 9fce20710..0d11fa605 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.Filter;
@@ -62,6 +63,8 @@ public interface FileStoreScan {
 
     FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
 
+    FileStoreScan withMetrics(ScanMetrics metrics);
+
     /** Produce a {@link Plan}. */
     Plan plan();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
new file mode 100644
index 000000000..f67bde96c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+/** Metrics to measure scan operation. */
+public class ScanMetrics {
+    private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
+    @VisibleForTesting protected static final String GROUP_NAME = "scan";
+    private final MetricGroup metricGroup;
+
+    public ScanMetrics(MetricRegistry registry, String tableName) {
+        this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+        registerGenericScanMetrics();
+    }
+
+    @VisibleForTesting
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
+
+    private Histogram durationHistogram;
+
+    private ScanStats latestScan;
+
+    @VisibleForTesting static final String LAST_SCAN_DURATION = 
"lastScanDuration";
+    @VisibleForTesting static final String SCAN_DURATION = "scanDuration";
+    @VisibleForTesting static final String LAST_SCANNED_MANIFESTS = 
"lastScannedManifests";
+
+    @VisibleForTesting
+    static final String LAST_SKIPPED_BY_PARTITION_AND_STATS = 
"lastSkippedByPartitionAndStats";
+
+    @VisibleForTesting
+    static final String LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER =
+            "lastSkippedByBucketAndLevelFilter";
+
+    @VisibleForTesting
+    static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER =
+            "lastSkippedByWholeBucketFilesFilter";
+
+    @VisibleForTesting
+    static final String LAST_SCAN_SKIPPED_TABLE_FILES = 
"lastScanSkippedTableFiles";
+
+    @VisibleForTesting
+    static final String LAST_SCAN_RESULTED_TABLE_FILES = 
"lastScanResultedTableFiles";
+
+    private void registerGenericScanMetrics() {
+        metricGroup.gauge(
+                LAST_SCAN_DURATION, () -> latestScan == null ? 0L : 
latestScan.getDuration());
+        durationHistogram = metricGroup.histogram(SCAN_DURATION, 
HISTOGRAM_WINDOW_SIZE);
+        metricGroup.gauge(
+                LAST_SCANNED_MANIFESTS,
+                () -> latestScan == null ? 0L : 
latestScan.getScannedManifests());
+        metricGroup.gauge(
+                LAST_SKIPPED_BY_PARTITION_AND_STATS,
+                () -> latestScan == null ? 0L : 
latestScan.getSkippedByPartitionAndStats());
+        metricGroup.gauge(
+                LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER,
+                () -> latestScan == null ? 0L : 
latestScan.getSkippedByBucketAndLevelFilter());
+        metricGroup.gauge(
+                LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER,
+                () -> latestScan == null ? 0L : 
latestScan.getSkippedByWholeBucketFiles());
+        metricGroup.gauge(
+                LAST_SCAN_SKIPPED_TABLE_FILES,
+                () -> latestScan == null ? 0L : 
latestScan.getSkippedTableFiles());
+        metricGroup.gauge(
+                LAST_SCAN_RESULTED_TABLE_FILES,
+                () -> latestScan == null ? 0L : 
latestScan.getResultedTableFiles());
+    }
+
+    public void reportScan(ScanStats scanStats) {
+        latestScan = scanStats;
+        durationHistogram.update(scanStats.getDuration());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
new file mode 100644
index 000000000..b94643459
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+/** Statistics for a scan operation. */
+public class ScanStats {
+    private final long duration;
+    private final long scannedManifests;
+    private final long skippedByPartitionAndStats;
+    private final long skippedByBucketAndLevelFilter;
+
+    private final long skippedByWholeBucketFiles;
+    private final long skippedTableFiles;
+    private final long resultedTableFiles;
+
+    public ScanStats(
+            long duration,
+            long scannedManifests,
+            long skippedByPartitionAndStats,
+            long skippedByBucketAndLevelFilter,
+            long skippedByWholeBucketFiles,
+            long resultedTableFiles) {
+        this.duration = duration;
+        this.scannedManifests = scannedManifests;
+        this.skippedByPartitionAndStats = skippedByPartitionAndStats;
+        this.skippedByBucketAndLevelFilter = skippedByBucketAndLevelFilter;
+        this.skippedByWholeBucketFiles = skippedByWholeBucketFiles;
+        this.skippedTableFiles =
+                skippedByPartitionAndStats
+                        + skippedByBucketAndLevelFilter
+                        + skippedByWholeBucketFiles;
+        this.resultedTableFiles = resultedTableFiles;
+    }
+
+    @VisibleForTesting
+    protected long getScannedManifests() {
+        return scannedManifests;
+    }
+
+    @VisibleForTesting
+    protected long getSkippedTableFiles() {
+        return skippedTableFiles;
+    }
+
+    @VisibleForTesting
+    protected long getResultedTableFiles() {
+        return resultedTableFiles;
+    }
+
+    @VisibleForTesting
+    protected long getSkippedByPartitionAndStats() {
+        return skippedByPartitionAndStats;
+    }
+
+    @VisibleForTesting
+    protected long getSkippedByBucketAndLevelFilter() {
+        return skippedByBucketAndLevelFilter;
+    }
+
+    @VisibleForTesting
+    protected long getSkippedByWholeBucketFiles() {
+        return skippedByWholeBucketFiles;
+    }
+
+    @VisibleForTesting
+    protected long getDuration() {
+        return duration;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 596b62109..305a63b5f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -135,7 +135,8 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 snapshotManager(),
                 splitGenerator(),
                 nonPartitionFilterConsumer(),
-                DefaultValueAssigner.create(tableSchema));
+                DefaultValueAssigner.create(tableSchema),
+                name());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index d53a7d2e3..ab55deba1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -57,6 +58,8 @@ public interface SnapshotReader {
 
     SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);
 
+    SnapshotReader withMetricRegistry(MetricRegistry registry);
+
     /** Get splits plan from snapshot. */
     Plan read();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 70673ca6d..aeb1143c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,8 +28,10 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.TableSchema;
@@ -72,6 +74,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private ScanMode scanMode = ScanMode.ALL;
     private RecordComparator lazyPartitionComparator;
 
+    private final String tableName;
+
     public SnapshotReaderImpl(
             FileStoreScan scan,
             TableSchema tableSchema,
@@ -79,7 +83,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
             SnapshotManager snapshotManager,
             SplitGenerator splitGenerator,
             BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
-            DefaultValueAssigner defaultValueAssigner) {
+            DefaultValueAssigner defaultValueAssigner,
+            String tableName) {
+        this.tableName = tableName;
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.options = options;
@@ -173,6 +179,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withMetricRegistry(MetricRegistry registry) {
+        scan.withMetrics(new ScanMetrics(registry, tableName));
+        return this;
+    }
+
     /** Get splits from {@link FileKind#ADD} files. */
     @Override
     public Plan read() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index c26dd7601..fb7bfa131 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -247,6 +248,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withMetricRegistry(MetricRegistry registry) {
+            snapshotReader.withMetricRegistry(registry);
+            return this;
+        }
+
         @Override
         public Plan read() {
             return snapshotReader.read();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
new file mode 100644
index 000000000..5b41d0258
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistryImpl;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.offset;
+
+/** Tests for {@link ScanMetrics}. */
+public class ScanMetricsTest {
+    private static final String TABLE_NAME = "myTable";
+
+    /** Tests the registration of the commit metrics. */
+    @Test
+    public void testGenericMetricsRegistration() {
+        ScanMetrics scanMetrics = getScanMetrics();
+        MetricGroup metricGroup = scanMetrics.getMetricGroup();
+        
assertThat(metricGroup.getGroupName()).isEqualTo(ScanMetrics.GROUP_NAME);
+        Map<String, Metric> registeredMetrics = metricGroup.getMetrics();
+        assertThat(registeredMetrics.keySet())
+                .containsExactlyInAnyOrder(
+                        ScanMetrics.LAST_SCAN_DURATION,
+                        ScanMetrics.SCAN_DURATION,
+                        ScanMetrics.LAST_SCANNED_MANIFESTS,
+                        ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
+                        ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
+                        ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS,
+                        ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER,
+                        ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
+    }
+
+    /** Tests that the metrics are updated properly. */
+    @Test
+    public void testMetricsAreUpdated() {
+        ScanMetrics scanMetrics = getScanMetrics();
+        Map<String, Metric> registeredGenericMetrics = 
scanMetrics.getMetricGroup().getMetrics();
+
+        // Check initial values
+        Gauge<Long> lastScanDuration =
+                (Gauge<Long>) 
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_DURATION);
+        Histogram scanDuration =
+                (Histogram) 
registeredGenericMetrics.get(ScanMetrics.SCAN_DURATION);
+        Gauge<Long> lastScannedManifests =
+                (Gauge<Long>) 
registeredGenericMetrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS);
+        Gauge<Long> lastSkippedByPartitionAndStats =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS);
+        Gauge<Long> lastSkippedByBucketAndLevelFilter =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER);
+        Gauge<Long> lastSkippedByWholeBucketFilesFilter =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
+        Gauge<Long> lastScanSkippedTableFiles =
+                (Gauge<Long>)
+                        
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES);
+        Gauge<Long> lastScanResultedTableFiles =
+                (Gauge<Long>)
+                        
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES);
+
+        assertThat(lastScanDuration.getValue()).isEqualTo(0);
+        assertThat(scanDuration.getCount()).isEqualTo(0);
+        assertThat(scanDuration.getStatistics().size()).isEqualTo(0);
+        assertThat(lastScannedManifests.getValue()).isEqualTo(0);
+        assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(0);
+        assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(0);
+        
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(0);
+        assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(0);
+        assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(0);
+
+        // report once
+        reportOnce(scanMetrics);
+
+        // generic metrics value updated
+        assertThat(lastScanDuration.getValue()).isEqualTo(200);
+        assertThat(scanDuration.getCount()).isEqualTo(1);
+        assertThat(scanDuration.getStatistics().size()).isEqualTo(1);
+        
assertThat(scanDuration.getStatistics().getValues()[0]).isEqualTo(200L);
+        assertThat(scanDuration.getStatistics().getMin()).isEqualTo(200);
+        
assertThat(scanDuration.getStatistics().getQuantile(0.5)).isCloseTo(200.0, 
offset(0.001));
+        assertThat(scanDuration.getStatistics().getMean()).isEqualTo(200);
+        assertThat(scanDuration.getStatistics().getMax()).isEqualTo(200);
+        assertThat(scanDuration.getStatistics().getStdDev()).isEqualTo(0);
+        assertThat(lastScannedManifests.getValue()).isEqualTo(20);
+        assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(25);
+        assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(40);
+        
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(32);
+        assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(97);
+        assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(10);
+
+        // report again
+        reportAgain(scanMetrics);
+
+        // generic metrics value updated
+        assertThat(lastScanDuration.getValue()).isEqualTo(500);
+        assertThat(scanDuration.getCount()).isEqualTo(2);
+        assertThat(scanDuration.getStatistics().size()).isEqualTo(2);
+        
assertThat(scanDuration.getStatistics().getValues()[1]).isEqualTo(500L);
+        assertThat(scanDuration.getStatistics().getMin()).isEqualTo(200);
+        
assertThat(scanDuration.getStatistics().getQuantile(0.5)).isCloseTo(350.0, 
offset(0.001));
+        assertThat(scanDuration.getStatistics().getMean()).isEqualTo(350);
+        assertThat(scanDuration.getStatistics().getMax()).isEqualTo(500);
+        
assertThat(scanDuration.getStatistics().getStdDev()).isCloseTo(212.132, 
offset(0.001));
+        assertThat(lastScannedManifests.getValue()).isEqualTo(22);
+        assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(30);
+        assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(42);
+        
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(33);
+        assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(105);
+        assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(8);
+    }
+
+    private void reportOnce(ScanMetrics scanMetrics) {
+        ScanStats scanStats = new ScanStats(200, 20, 25, 40, 32, 10);
+        scanMetrics.reportScan(scanStats);
+    }
+
+    private void reportAgain(ScanMetrics scanMetrics) {
+        ScanStats scanStats = new ScanStats(500, 22, 30, 42, 33, 8);
+        scanMetrics.reportScan(scanStats);
+    }
+
+    private ScanMetrics getScanMetrics() {
+        return new ScanMetrics(new MetricRegistryImpl(), TABLE_NAME);
+    }
+}

Reply via email to