This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ec187e5 feat: support collect statistics about the engine (#1451)
5ec187e5 is described below

commit 5ec187e5926f23ecafb6edaf13a843ad97ec22a9
Author: WEI Xikai <[email protected]>
AuthorDate: Fri Jan 26 15:29:30 2024 +0800

    feat: support collect statistics about the engine (#1451)
    
    ## Rationale
    Close #1438.
    
    ## Detailed Changes
    Support report statistics on shard level.
    
    ## Test Plan
    New unit tests is added and the CI should pass.
---
 src/analytic_engine/src/engine.rs             | 143 +++++++++++++++++++++++++-
 src/analytic_engine/src/instance/write.rs     |   9 +-
 src/analytic_engine/src/sst/metrics.rs        |   9 +-
 src/analytic_engine/src/sst/parquet/writer.rs |   2 +-
 src/analytic_engine/src/table/data.rs         |   4 +-
 src/analytic_engine/src/table/metrics.rs      | 125 ++++++++++++----------
 src/benchmarks/src/merge_memtable_bench.rs    |   2 +-
 src/benchmarks/src/merge_sst_bench.rs         |   2 +-
 src/benchmarks/src/sst_bench.rs               |   2 +-
 src/benchmarks/src/sst_tools.rs               |   4 +-
 src/benchmarks/src/util.rs                    |   2 +-
 src/table_engine/src/engine.rs                |  16 +++
 src/tools/src/bin/sst-convert.rs              |   2 +-
 13 files changed, 248 insertions(+), 74 deletions(-)

diff --git a/src/analytic_engine/src/engine.rs 
b/src/analytic_engine/src/engine.rs
index 6cc9df58..6f873534 100644
--- a/src/analytic_engine/src/engine.rs
+++ b/src/analytic_engine/src/engine.rs
@@ -17,23 +17,31 @@
 
 //! Implements the TableEngine trait
 
-use std::sync::Arc;
+use std::{collections::HashMap, sync::Arc};
 
 use async_trait::async_trait;
+use common_types::table::ShardId;
 use generic_error::BoxError;
 use logger::{error, info};
+use prometheus::{core::Collector, HistogramVec, IntCounterVec};
 use snafu::{OptionExt, ResultExt};
 use table_engine::{
     engine::{
         Close, CloseShardRequest, CloseTableRequest, CreateTableParams, 
CreateTableRequest,
         DropTableRequest, OpenShard, OpenShardRequest, OpenShardResult, 
OpenTableNoCause,
-        OpenTableRequest, OpenTableWithCause, Result, TableDef, TableEngine,
+        OpenTableRequest, OpenTableWithCause, Result, ShardStats, TableDef, 
TableEngine,
+        TableEngineStats, Unexpected,
     },
     table::{SchemaId, TableRef},
     ANALYTIC_ENGINE_TYPE,
 };
 
-use crate::{instance::InstanceRef, space::SpaceId, table::TableImpl};
+use crate::{
+    instance::InstanceRef,
+    space::SpaceId,
+    sst::metrics::FETCHED_SST_BYTES_HISTOGRAM,
+    table::{metrics::TABLE_WRITE_BYTES_COUNTER, TableImpl},
+};
 
 /// TableEngine implementation
 pub struct TableEngineImpl {
@@ -234,6 +242,63 @@ impl TableEngine for TableEngineImpl {
 
         self.close_tables_of_shard(close_requests).await
     }
+
+    async fn report_statistics(&self) -> Result<Option<TableEngineStats>> {
+        let table_engine_stats =
+            collect_stats_from_metric(&FETCHED_SST_BYTES_HISTOGRAM, 
&TABLE_WRITE_BYTES_COUNTER)?;
+
+        Ok(Some(table_engine_stats))
+    }
+}
+
+/// Collect the table engine stats from the two provided metric.
+fn collect_stats_from_metric(
+    fetched_bytes_hist: &HistogramVec,
+    written_bytes_counter: &IntCounterVec,
+) -> Result<TableEngineStats> {
+    let mut shard_stats: HashMap<ShardId, ShardStats> = HashMap::new();
+
+    // Collect the metrics for fetched bytes by shards.
+    for_shard_metric(fetched_bytes_hist, |shard_id, metric| {
+        let sum = metric.get_histogram().get_sample_sum() as u64;
+        let stats = shard_stats.entry(shard_id).or_default();
+        stats.num_fetched_bytes += sum;
+    })?;
+
+    // Collect the metrics for the written bytes by shards.
+    for_shard_metric(written_bytes_counter, |shard_id, metric| {
+        let sum = metric.get_counter().get_value() as u64;
+        let stats = shard_stats.entry(shard_id).or_default();
+        stats.num_written_bytes += sum;
+    })?;
+
+    Ok(TableEngineStats { shard_stats })
+}
+
+/// Iterate the metrics collected by `metric_collector`, and provide the metric
+/// with a valid shard_id to the `f` closure.
+fn for_shard_metric<C, F>(metric_collector: &C, mut f: F) -> Result<()>
+where
+    C: Collector,
+    F: FnMut(ShardId, &prometheus::proto::Metric),
+{
+    const SHARD_LABEL: &str = "shard_id";
+
+    let metric_families = metric_collector.collect();
+    for metric_family in metric_families {
+        for metric in metric_family.get_metric() {
+            let labels = metric.get_label();
+            let shard_id = labels
+                .iter()
+                .find_map(|pair| (pair.get_name() == SHARD_LABEL).then(|| 
pair.get_value()));
+            if let Some(raw_shard_id) = shard_id {
+                let shard_id: ShardId = 
str::parse(raw_shard_id).box_err().context(Unexpected)?;
+                f(shard_id, metric);
+            }
+        }
+    }
+
+    Ok(())
 }
 
 /// Generate the space id from the schema id with assumption schema id is 
unique
@@ -242,3 +307,75 @@ impl TableEngine for TableEngineImpl {
 pub fn build_space_id(schema_id: SchemaId) -> SpaceId {
     schema_id.as_u32()
 }
+
+#[cfg(test)]
+mod tests {
+    use prometheus::{exponential_buckets, register_histogram_vec, 
register_int_counter_vec};
+
+    use super::*;
+
+    #[test]
+    fn test_collect_table_engine_stats() {
+        let hist = register_histogram_vec!(
+            "fetched_bytes",
+            "Histogram for sst get range length",
+            &["shard_id", "table"],
+            // The buckets: [1MB, 2MB, 4MB, 8MB, ... , 8GB]
+            exponential_buckets(1024.0 * 1024.0, 2.0, 13).unwrap()
+        )
+        .unwrap();
+
+        hist.with_label_values(&["0", "table_0"]).observe(1000.0);
+        hist.with_label_values(&["0", "table_1"]).observe(1000.0);
+        hist.with_label_values(&["0", "table_2"]).observe(1000.0);
+        hist.with_label_values(&["1", "table_3"]).observe(1000.0);
+        hist.with_label_values(&["1", "table_4"]).observe(1000.0);
+        hist.with_label_values(&["2", "table_5"]).observe(4000.0);
+
+        let counter = register_int_counter_vec!(
+            "written_counter",
+            "Write bytes counter of table",
+            &["shard_id", "table"]
+        )
+        .unwrap();
+
+        counter.with_label_values(&["0", "table_0"]).inc_by(100);
+        counter.with_label_values(&["0", "table_1"]).inc_by(100);
+        counter.with_label_values(&["0", "table_2"]).inc_by(100);
+        counter.with_label_values(&["1", "table_3"]).inc_by(100);
+        counter.with_label_values(&["1", "table_4"]).inc_by(100);
+        counter.with_label_values(&["2", "table_5"]).inc_by(400);
+
+        let stats = collect_stats_from_metric(&hist, &counter).unwrap();
+
+        let expected_stats = {
+            let mut shard_stats: HashMap<ShardId, ShardStats> = HashMap::new();
+
+            shard_stats.insert(
+                0,
+                ShardStats {
+                    num_fetched_bytes: 3000,
+                    num_written_bytes: 300,
+                },
+            );
+            shard_stats.insert(
+                1,
+                ShardStats {
+                    num_fetched_bytes: 2000,
+                    num_written_bytes: 200,
+                },
+            );
+            shard_stats.insert(
+                2,
+                ShardStats {
+                    num_fetched_bytes: 4000,
+                    num_written_bytes: 400,
+                },
+            );
+
+            shard_stats
+        };
+
+        assert_eq!(stats.shard_stats, expected_stats);
+    }
+}
diff --git a/src/analytic_engine/src/instance/write.rs 
b/src/analytic_engine/src/instance/write.rs
index f94b72b9..95f2efce 100644
--- a/src/analytic_engine/src/instance/write.rs
+++ b/src/analytic_engine/src/instance/write.rs
@@ -555,9 +555,12 @@ impl<'a> Writer<'a> {
 
         // Collect metrics.
         let num_columns = row_group.schema().num_columns();
-        table_data
-            .metrics
-            .on_write_request_done(row_group.num_rows(), num_columns);
+        let num_written_bytes: usize = row_group.iter().map(|row| 
row.size()).sum();
+        table_data.metrics.on_write_request_done(
+            row_group.num_rows(),
+            num_columns,
+            num_written_bytes,
+        );
 
         Ok(())
     }
diff --git a/src/analytic_engine/src/sst/metrics.rs 
b/src/analytic_engine/src/sst/metrics.rs
index d4ae2072..f2eff571 100644
--- a/src/analytic_engine/src/sst/metrics.rs
+++ b/src/analytic_engine/src/sst/metrics.rs
@@ -54,10 +54,10 @@ lazy_static! {
         &["table"]
     ).unwrap();
 
-    static ref FETCHED_SST_BYTES_HISTOGRAM: HistogramVec = 
register_histogram_vec!(
+    pub static ref FETCHED_SST_BYTES_HISTOGRAM: HistogramVec = 
register_histogram_vec!(
         "fetched_sst_bytes",
         "Histogram for sst get range length",
-        &["table"],
+        &["shard_id", "table"],
         // The buckets: [1MB, 2MB, 4MB, 8MB, ... , 8GB]
         exponential_buckets(1024.0 * 1024.0, 2.0, 13).unwrap()
     ).unwrap();
@@ -72,13 +72,14 @@ pub struct MaybeTableLevelMetrics {
 }
 
 impl MaybeTableLevelMetrics {
-    pub fn new(table: &str) -> Self {
+    pub fn new(table: &str, shard_id_label: &str) -> Self {
         Self {
             row_group_before_prune_counter: ROW_GROUP_BEFORE_PRUNE_COUNTER
                 .with_label_values(&[table]),
             row_group_after_prune_counter: ROW_GROUP_AFTER_PRUNE_COUNTER
                 .with_label_values(&[table]),
-            num_fetched_sst_bytes_hist: 
FETCHED_SST_BYTES_HISTOGRAM.with_label_values(&[table]),
+            num_fetched_sst_bytes_hist: FETCHED_SST_BYTES_HISTOGRAM
+                .with_label_values(&[&shard_id_label, table]),
             num_fetched_sst_bytes: AtomicU64::new(0),
         }
     }
diff --git a/src/analytic_engine/src/sst/parquet/writer.rs 
b/src/analytic_engine/src/sst/parquet/writer.rs
index 88293497..eb2505ec 100644
--- a/src/analytic_engine/src/sst/parquet/writer.rs
+++ b/src/analytic_engine/src/sst/parquet/writer.rs
@@ -761,7 +761,7 @@ mod tests {
                 None,
             );
             let sst_read_options = SstReadOptions {
-                maybe_table_level_metrics: 
Arc::new(MaybeTableLevelMetrics::new("test")),
+                maybe_table_level_metrics: 
Arc::new(MaybeTableLevelMetrics::new("test", "0")),
                 frequency: ReadFrequency::Frequent,
                 num_rows_per_row_group: 5,
                 predicate: Arc::new(Predicate::empty()),
diff --git a/src/analytic_engine/src/table/data.rs 
b/src/analytic_engine/src/table/data.rs
index ececf946..bcb7ec1f 100644
--- a/src/analytic_engine/src/table/data.rs
+++ b/src/analytic_engine/src/table/data.rs
@@ -341,7 +341,7 @@ impl TableData {
         let purge_queue = purger.create_purge_queue(space_id, id);
         let current_version =
             TableVersion::new(mem_size_options.size_sampling_interval, 
purge_queue);
-        let metrics_ctx = MetricsContext::new(&name, metrics_opt);
+        let metrics_ctx = MetricsContext::new(&name, shard_id, metrics_opt);
         let metrics = Metrics::new(metrics_ctx);
         let mutable_limit = AtomicU32::new(compute_mutable_limit(
             opts.write_buffer_size,
@@ -421,7 +421,7 @@ impl TableData {
         let purge_queue = purger.create_purge_queue(add_meta.space_id, 
add_meta.table_id);
         let current_version =
             TableVersion::new(mem_size_options.size_sampling_interval, 
purge_queue);
-        let metrics_ctx = MetricsContext::new(&add_meta.table_name, 
metrics_opt);
+        let metrics_ctx = MetricsContext::new(&add_meta.table_name, shard_id, 
metrics_opt);
         let metrics = Metrics::new(metrics_ctx);
         let mutable_limit = AtomicU32::new(compute_mutable_limit(
             add_meta.opts.write_buffer_size,
diff --git a/src/analytic_engine/src/table/metrics.rs 
b/src/analytic_engine/src/table/metrics.rs
index f2164d1d..d1aa0c57 100644
--- a/src/analytic_engine/src/table/metrics.rs
+++ b/src/analytic_engine/src/table/metrics.rs
@@ -25,12 +25,13 @@ use std::{
     time::Duration,
 };
 
+use common_types::table::ShardId;
 use lazy_static::lazy_static;
 use prometheus::{
     exponential_buckets,
     local::{LocalHistogram, LocalHistogramTimer},
-    register_histogram, register_histogram_vec, register_int_counter, 
Histogram, HistogramTimer,
-    HistogramVec, IntCounter,
+    register_histogram, register_histogram_vec, register_int_counter, 
register_int_counter_vec,
+    Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec,
 };
 use table_engine::{partition::maybe_extract_partitioned_table_name, 
table::TableStats};
 
@@ -47,10 +48,10 @@ lazy_static! {
     )
     .unwrap();
 
-    static ref TABLE_WRITE_BATCH_HISTOGRAM: Histogram = register_histogram!(
-        "table_write_batch_size",
-        "Histogram of write batch size",
-        vec![10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0]
+    pub static ref TABLE_WRITE_BYTES_COUNTER: IntCounterVec = 
register_int_counter_vec!(
+        "table_write_bytes_counter",
+        "Write bytes counter of table",
+        &["shard_id", "table"]
     )
     .unwrap();
 
@@ -68,6 +69,13 @@ lazy_static! {
     // End of counters.
 
     // Histograms:
+    static ref TABLE_WRITE_BATCH_HISTOGRAM: Histogram = register_histogram!(
+        "table_write_batch_size",
+        "Histogram of write batch size",
+        vec![10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0]
+    )
+    .unwrap();
+
     // Buckets: 0, 0.002, .., 0.002 * 4^9
     static ref TABLE_FLUSH_DURATION_HISTOGRAM: Histogram = register_histogram!(
         "table_flush_duration",
@@ -170,6 +178,8 @@ impl From<&AtomicTableStats> for TableStats {
 pub struct Metrics {
     /// The table name used for metric label
     maybe_table_name: String,
+    /// The label for shard id
+    shard_id_label: String,
     /// Stats of a single table.
     stats: Arc<AtomicTableStats>,
 
@@ -190,47 +200,7 @@ pub struct Metrics {
     table_write_queue_waiter_duration: Histogram,
     table_write_queue_writer_duration: Histogram,
     table_write_total_duration: Histogram,
-}
-
-impl Default for Metrics {
-    fn default() -> Self {
-        Self {
-            maybe_table_name: DEFAULT_METRICS_KEY.to_string(),
-            stats: Arc::new(AtomicTableStats::default()),
-            compaction_input_sst_size_histogram: 
TABLE_COMPACTION_SST_SIZE_HISTOGRAM
-                .with_label_values(&["input"]),
-            compaction_output_sst_size_histogram: 
TABLE_COMPACTION_SST_SIZE_HISTOGRAM
-                .with_label_values(&["output"]),
-            compaction_input_sst_row_num_histogram: 
TABLE_COMPACTION_SST_ROW_NUM_HISTOGRAM
-                .with_label_values(&["input"]),
-            compaction_output_sst_row_num_histogram: 
TABLE_COMPACTION_SST_ROW_NUM_HISTOGRAM
-                .with_label_values(&["output"]),
-
-            table_write_stall_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["stall"]),
-            table_write_encode_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["encode"]),
-            table_write_wal_duration: 
TABLE_WRITE_DURATION_HISTOGRAM.with_label_values(&["wal"]),
-            table_write_memtable_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["memtable"]),
-            table_write_preprocess_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["preprocess"]),
-            table_write_space_flush_wait_duration: 
TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["wait_space_flush"]),
-            table_write_instance_flush_wait_duration: 
TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["wait_instance_flush"]),
-            table_write_flush_wait_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["wait_flush"]),
-            table_write_execute_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["execute"]),
-            table_write_queue_waiter_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["queue_waiter"]),
-            table_write_queue_writer_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["queue_writer"]),
-            table_write_total_duration: TABLE_WRITE_DURATION_HISTOGRAM
-                .with_label_values(&["total"]),
-        }
-    }
+    table_write_bytes_counter: IntCounter,
 }
 
 pub struct MaybeTableLevelMetrics {
@@ -244,8 +214,11 @@ pub struct MaybeTableLevelMetrics {
 }
 
 impl MaybeTableLevelMetrics {
-    pub fn new(maybe_table_name: &str) -> Self {
-        let sst_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new(maybe_table_name));
+    pub fn new(maybe_table_name: &str, shard_id_label: &str) -> Self {
+        let sst_metrics = Arc::new(SstMaybeTableLevelMetrics::new(
+            maybe_table_name,
+            shard_id_label,
+        ));
 
         Self {
             query_time_range: 
QUERY_TIME_RANGE.with_label_values(&[maybe_table_name]),
@@ -260,14 +233,16 @@ pub struct MetricsContext<'a> {
     /// If enable table level metrics, it should be a table name,
     /// Otherwise it should be `DEFAULT_METRICS_KEY`.
     table_name: &'a str,
+    shard_id: ShardId,
     metric_opt: MetricsOptions,
     maybe_partitioned_table_name: Option<String>,
 }
 
 impl<'a> MetricsContext<'a> {
-    pub fn new(table_name: &'a str, metric_opt: MetricsOptions) -> Self {
+    pub fn new(table_name: &'a str, shard_id: ShardId, metric_opt: 
MetricsOptions) -> Self {
         Self {
             table_name,
+            shard_id,
             metric_opt,
             maybe_partitioned_table_name: None,
         }
@@ -291,16 +266,57 @@ impl<'a> MetricsContext<'a> {
 
 impl Metrics {
     pub fn new(mut metric_ctx: MetricsContext) -> Self {
+        let shard_id_label = metric_ctx.shard_id.to_string();
+        let maybe_table_name = metric_ctx.maybe_table_name().to_string();
+        let table_write_bytes_counter =
+            TABLE_WRITE_BYTES_COUNTER.with_label_values(&[&shard_id_label, 
&maybe_table_name]);
         Self {
-            maybe_table_name: metric_ctx.maybe_table_name().to_string(),
-            ..Default::default()
+            maybe_table_name,
+            shard_id_label,
+            stats: Arc::new(AtomicTableStats::default()),
+            compaction_input_sst_size_histogram: 
TABLE_COMPACTION_SST_SIZE_HISTOGRAM
+                .with_label_values(&["input"]),
+            compaction_output_sst_size_histogram: 
TABLE_COMPACTION_SST_SIZE_HISTOGRAM
+                .with_label_values(&["output"]),
+            compaction_input_sst_row_num_histogram: 
TABLE_COMPACTION_SST_ROW_NUM_HISTOGRAM
+                .with_label_values(&["input"]),
+            compaction_output_sst_row_num_histogram: 
TABLE_COMPACTION_SST_ROW_NUM_HISTOGRAM
+                .with_label_values(&["output"]),
+
+            table_write_stall_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["stall"]),
+            table_write_encode_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["encode"]),
+            table_write_wal_duration: 
TABLE_WRITE_DURATION_HISTOGRAM.with_label_values(&["wal"]),
+            table_write_memtable_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["memtable"]),
+            table_write_preprocess_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["preprocess"]),
+            table_write_space_flush_wait_duration: 
TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["wait_space_flush"]),
+            table_write_instance_flush_wait_duration: 
TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["wait_instance_flush"]),
+            table_write_flush_wait_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["wait_flush"]),
+            table_write_execute_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["execute"]),
+            table_write_queue_waiter_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["queue_waiter"]),
+            table_write_queue_writer_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["queue_writer"]),
+            table_write_total_duration: TABLE_WRITE_DURATION_HISTOGRAM
+                .with_label_values(&["total"]),
+            table_write_bytes_counter,
         }
     }
 
     /// Generate a table-level metric observer.
     #[inline]
     pub fn maybe_table_level_metrics(&self) -> Arc<MaybeTableLevelMetrics> {
-        Arc::new(MaybeTableLevelMetrics::new(&self.maybe_table_name))
+        Arc::new(MaybeTableLevelMetrics::new(
+            &self.maybe_table_name,
+            &self.shard_id_label,
+        ))
     }
 
     #[inline]
@@ -315,9 +331,10 @@ impl Metrics {
     }
 
     #[inline]
-    pub fn on_write_request_done(&self, num_rows: usize, num_columns: usize) {
+    pub fn on_write_request_done(&self, num_rows: usize, num_columns: usize, 
num_bytes: usize) {
         TABLE_WRITE_BATCH_HISTOGRAM.observe(num_rows as f64);
         TABLE_WRITE_FIELDS_COUNTER.inc_by((num_columns * num_rows) as u64);
+        self.table_write_bytes_counter.inc_by(num_bytes as u64);
     }
 
     #[inline]
diff --git a/src/benchmarks/src/merge_memtable_bench.rs 
b/src/benchmarks/src/merge_memtable_bench.rs
index 2cec5681..50f72545 100644
--- a/src/benchmarks/src/merge_memtable_bench.rs
+++ b/src/benchmarks/src/merge_memtable_bench.rs
@@ -219,7 +219,7 @@ fn mock_sst_read_options_builder(
         max_record_batches_in_flight: 1024,
         num_streams_to_prefetch: 0,
     };
-    let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench"));
+    let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench", ""));
 
     SstReadOptionsBuilder::new(
         ScanType::Query,
diff --git a/src/benchmarks/src/merge_sst_bench.rs 
b/src/benchmarks/src/merge_sst_bench.rs
index 7bbb364f..c71da73e 100644
--- a/src/benchmarks/src/merge_sst_bench.rs
+++ b/src/benchmarks/src/merge_sst_bench.rs
@@ -83,7 +83,7 @@ impl MergeSstBench {
             num_streams_to_prefetch: 0,
         };
 
-        let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench"));
+        let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench", ""));
         let scan_type = ScanType::Query;
         let sst_read_options_builder = SstReadOptionsBuilder::new(
             scan_type,
diff --git a/src/benchmarks/src/sst_bench.rs b/src/benchmarks/src/sst_bench.rs
index a778d106..88cfcc46 100644
--- a/src/benchmarks/src/sst_bench.rs
+++ b/src/benchmarks/src/sst_bench.rs
@@ -64,7 +64,7 @@ impl SstBench {
             max_record_batches_in_flight: 1024,
             num_streams_to_prefetch: 0,
         };
-        let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench"));
+        let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench", ""));
         let sst_read_options_builder = SstReadOptionsBuilder::new(
             ScanType::Query,
             scan_options,
diff --git a/src/benchmarks/src/sst_tools.rs b/src/benchmarks/src/sst_tools.rs
index 527e4598..c40324bf 100644
--- a/src/benchmarks/src/sst_tools.rs
+++ b/src/benchmarks/src/sst_tools.rs
@@ -133,7 +133,7 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: 
Arc<Runtime>) {
     let table_schema = projected_schema.table_schema().clone();
     let row_projector_builder = RowProjectorBuilder::new(fetched_schema, 
table_schema, None);
     let sst_read_options = SstReadOptions {
-        maybe_table_level_metrics: 
Arc::new(SstMaybeTableLevelMetrics::new("bench")),
+        maybe_table_level_metrics: 
Arc::new(SstMaybeTableLevelMetrics::new("bench", "")),
         frequency: ReadFrequency::Once,
         num_rows_per_row_group: config.num_rows_per_row_group,
         predicate: config.predicate.into_predicate(),
@@ -246,7 +246,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: 
Arc<Runtime>) {
     let sst_factory: SstFactoryRef = Arc::new(FactoryImpl);
     let store_picker: ObjectStorePickerRef = Arc::new(store);
     let projected_schema = ProjectedSchema::no_projection(schema.clone());
-    let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench"));
+    let maybe_table_level_metrics = 
Arc::new(SstMaybeTableLevelMetrics::new("bench", ""));
     let sst_read_options_builder = SstReadOptionsBuilder::new(
         ScanType::Query,
         scan_options,
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index 39503685..c173935f 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -132,7 +132,7 @@ pub async fn load_sst_to_memtable(
     let table_schema = projected_schema.table_schema().clone();
     let row_projector_builder = RowProjectorBuilder::new(fetched_schema, 
table_schema, None);
     let sst_read_options = SstReadOptions {
-        maybe_table_level_metrics: 
Arc::new(SstMaybeTableLevelMetrics::new("bench")),
+        maybe_table_level_metrics: 
Arc::new(SstMaybeTableLevelMetrics::new("bench", "")),
         frequency: ReadFrequency::Frequent,
         num_rows_per_row_group: 8192,
         predicate: Arc::new(Predicate::empty()),
diff --git a/src/table_engine/src/engine.rs b/src/table_engine/src/engine.rs
index 1fb77155..2a9ba800 100644
--- a/src/table_engine/src/engine.rs
+++ b/src/table_engine/src/engine.rs
@@ -306,6 +306,17 @@ pub struct TableDef {
 
 pub type CloseShardRequest = OpenShardRequest;
 
+#[derive(Clone, Debug, Default, PartialEq, Eq)]
+pub struct ShardStats {
+    pub num_written_bytes: u64,
+    pub num_fetched_bytes: u64,
+}
+
+#[derive(Debug, Default)]
+pub struct TableEngineStats {
+    pub shard_stats: HashMap<ShardId, ShardStats>,
+}
+
 /// Table engine
 // TODO(yingwen): drop table support to release resource owned by the table
 #[async_trait]
@@ -339,6 +350,11 @@ pub trait TableEngine: Send + Sync {
 
     /// Close tables on same shard.
     async fn close_shard(&self, request: CloseShardRequest) -> 
Vec<Result<String>>;
+
+    /// Report the statistics of the table engine.
+    async fn report_statistics(&self) -> Result<Option<TableEngineStats>> {
+        Ok(None)
+    }
 }
 
 pub type OpenShardResult = HashMap<TableId, GenericResult<Option<TableRef>>>;
diff --git a/src/tools/src/bin/sst-convert.rs b/src/tools/src/bin/sst-convert.rs
index 691fd2a5..68671b84 100644
--- a/src/tools/src/bin/sst-convert.rs
+++ b/src/tools/src/bin/sst-convert.rs
@@ -104,7 +104,7 @@ async fn run(args: Args, runtime: Arc<Runtime>) -> 
Result<()> {
     let table_schema = projected_schema.table_schema().clone();
     let row_projector_builder = RowProjectorBuilder::new(fetched_schema, 
table_schema, None);
     let reader_opts = SstReadOptions {
-        maybe_table_level_metrics: 
Arc::new(SstMaybeTableLevelMetrics::new("tool")),
+        maybe_table_level_metrics: 
Arc::new(SstMaybeTableLevelMetrics::new("tool", "")),
         frequency: ReadFrequency::Once,
         num_rows_per_row_group: 8192,
         predicate: Arc::new(Predicate::empty()),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to