This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch memtable-poc
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git

commit 9abd326d89fab9f4bd89ddf69898b3238fac03fe
Author: WEI Xikai <[email protected]>
AuthorDate: Thu Dec 21 17:42:35 2023 +0800

    fix: no write stall (#1388)
    
    small ssts, but the write stall is also removed in the normal write
    path.
    
    Introduce the `min_flush_interval` to avoid frequent flush requests and
    recover the write stall mechanism.
    
    Add unit tests for the frequent flush check.
---
 analytic_engine/src/compaction/scheduler.rs      |   4 +
 analytic_engine/src/instance/flush_compaction.rs | 138 ++++++++++++++++++++++-
 analytic_engine/src/instance/mod.rs              |  14 +++
 analytic_engine/src/instance/open.rs             |   2 +
 analytic_engine/src/instance/serial_executor.rs  |   6 +-
 analytic_engine/src/instance/wal_replayer.rs     |   3 +-
 analytic_engine/src/instance/write.rs            |   5 +-
 analytic_engine/src/lib.rs                       |   3 +
 analytic_engine/src/table/data.rs                |   5 +-
 9 files changed, 167 insertions(+), 13 deletions(-)

diff --git a/analytic_engine/src/compaction/scheduler.rs 
b/analytic_engine/src/compaction/scheduler.rs
index 1496ef51..9f6bdb80 100644
--- a/analytic_engine/src/compaction/scheduler.rs
+++ b/analytic_engine/src/compaction/scheduler.rs
@@ -306,6 +306,7 @@ impl SchedulerImpl {
         runtime: Arc<Runtime>,
         config: SchedulerConfig,
         write_sst_max_buffer_size: usize,
+        min_flush_interval_ms: u64,
         scan_options: ScanOptions,
     ) -> Self {
         let (tx, rx) = mpsc::channel(config.schedule_channel_len);
@@ -321,6 +322,7 @@ impl SchedulerImpl {
             max_ongoing_tasks: config.max_ongoing_tasks,
             max_unflushed_duration: config.max_unflushed_duration.0,
             write_sst_max_buffer_size,
+            min_flush_interval_ms,
             scan_options,
             limit: Arc::new(OngoingTaskLimit {
                 ongoing_tasks: AtomicUsize::new(0),
@@ -399,6 +401,7 @@ struct ScheduleWorker {
     picker_manager: PickerManager,
     max_ongoing_tasks: usize,
     write_sst_max_buffer_size: usize,
+    min_flush_interval_ms: u64,
     scan_options: ScanOptions,
     limit: Arc<OngoingTaskLimit>,
     running: Arc<AtomicBool>,
@@ -664,6 +667,7 @@ impl ScheduleWorker {
             space_store: self.space_store.clone(),
             runtime: self.runtime.clone(),
             write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: Some(self.min_flush_interval_ms),
         };
 
         for table_data in &tables_buf {
diff --git a/analytic_engine/src/instance/flush_compaction.rs 
b/analytic_engine/src/instance/flush_compaction.rs
index 7e960dab..5b7ec2e8 100644
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@ -208,6 +208,9 @@ pub struct Flusher {
 
     pub runtime: RuntimeRef,
     pub write_sst_max_buffer_size: usize,
+    /// If the interval is set, it will generate a [`FlushTask`] with min flush
+    /// interval check.
+    pub min_flush_interval_ms: Option<u64>,
 }
 
 struct FlushTask {
@@ -215,6 +218,22 @@ struct FlushTask {
     table_data: TableDataRef,
     runtime: RuntimeRef,
     write_sst_max_buffer_size: usize,
+    // If the interval is set, it will be used to check whether flush is too 
frequent.
+    min_flush_interval_ms: Option<u64>,
+}
+
+/// The checker to determine whether a flush is frequent.
+struct FrequentFlushChecker {
+    min_flush_interval_ms: u64,
+    last_flush_time_ms: u64,
+}
+
+impl FrequentFlushChecker {
+    #[inline]
+    fn is_frequent_flush(&self) -> bool {
+        let now = time_ext::current_time_millis();
+        self.last_flush_time_ms + self.min_flush_interval_ms > now
+    }
 }
 
 impl Flusher {
@@ -263,6 +282,7 @@ impl Flusher {
             space_store: self.space_store.clone(),
             runtime: self.runtime.clone(),
             write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: self.min_flush_interval_ms,
         };
         let flush_job = async move { flush_task.run().await };
 
@@ -276,6 +296,16 @@ impl FlushTask {
     /// Each table can only have one running flush task at the same time, which
     /// should be ensured by the caller.
     async fn run(&self) -> Result<()> {
+        let large_enough = self.table_data.should_flush_table(false);
+        if !large_enough && self.is_frequent_flush() {
+            debug!(
+                "Ignore flush task for too frequent flush of small memtable, 
table:{}",
+                self.table_data.name
+            );
+
+            return Ok(());
+        }
+
         let instant = Instant::now();
         let flush_req = self.preprocess_flush(&self.table_data).await?;
 
@@ -315,6 +345,18 @@ impl FlushTask {
         Ok(())
     }
 
+    fn is_frequent_flush(&self) -> bool {
+        if let Some(min_flush_interval_ms) = self.min_flush_interval_ms {
+            let checker = FrequentFlushChecker {
+                min_flush_interval_ms,
+                last_flush_time_ms: self.table_data.last_flush_time(),
+            };
+            checker.is_frequent_flush()
+        } else {
+            false
+        }
+    }
+
     async fn preprocess_flush(&self, table_data: &TableDataRef) -> 
Result<TableFlushRequest> {
         let current_version = table_data.current_version();
         let mut last_sequence = table_data.last_sequence();
@@ -1169,7 +1211,14 @@ mod tests {
         time::TimeRange,
     };
 
-    use crate::instance::flush_compaction::split_record_batch_with_time_ranges;
+    use super::{collect_column_stats_from_meta_datas, FrequentFlushChecker};
+    use crate::{
+        instance::flush_compaction::split_record_batch_with_time_ranges,
+        sst::{
+            meta_data::SstMetaData,
+            parquet::meta_data::{ColumnValueSet, ParquetMetaData},
+        },
+    };
 
     #[test]
     fn test_split_record_batch_with_time_ranges() {
@@ -1222,4 +1271,91 @@ mod tests {
         check_record_batch_with_key_with_rows(&rets[1], rows1.len(), 
column_num, rows1);
         check_record_batch_with_key_with_rows(&rets[2], rows2.len(), 
column_num, rows2);
     }
+
+    fn check_collect_column_stats(
+        schema: &Schema,
+        expected_low_cardinality_col_indexes: Vec<usize>,
+        meta_datas: Vec<SstMetaData>,
+    ) {
+        let column_stats = collect_column_stats_from_meta_datas(&meta_datas);
+        assert_eq!(
+            column_stats.len(),
+            expected_low_cardinality_col_indexes.len()
+        );
+
+        for col_idx in expected_low_cardinality_col_indexes {
+            let col_schema = schema.column(col_idx);
+            assert!(column_stats.contains_key(&col_schema.name));
+        }
+    }
+
+    #[test]
+    fn test_collect_column_stats_from_metadata() {
+        let schema = build_schema();
+        let build_meta_data = |low_cardinality_col_indexes: Vec<usize>| {
+            let mut column_values = vec![None; 6];
+            for idx in low_cardinality_col_indexes {
+                column_values[idx] = 
Some(ColumnValueSet::StringValue(Default::default()));
+            }
+            let parquet_meta_data = ParquetMetaData {
+                min_key: Bytes::new(),
+                max_key: Bytes::new(),
+                time_range: TimeRange::empty(),
+                max_sequence: 0,
+                schema: schema.clone(),
+                parquet_filter: None,
+                column_values: Some(column_values),
+            };
+            SstMetaData::Parquet(Arc::new(parquet_meta_data))
+        };
+
+        // Normal case 0
+        let meta_datas = vec![
+            build_meta_data(vec![0]),
+            build_meta_data(vec![0]),
+            build_meta_data(vec![0, 1]),
+            build_meta_data(vec![0, 2]),
+            build_meta_data(vec![0, 3]),
+        ];
+        check_collect_column_stats(&schema, vec![0], meta_datas);
+
+        // Normal case 1
+        let meta_datas = vec![
+            build_meta_data(vec![0]),
+            build_meta_data(vec![0]),
+            build_meta_data(vec![]),
+            build_meta_data(vec![1]),
+            build_meta_data(vec![3]),
+        ];
+        check_collect_column_stats(&schema, vec![], meta_datas);
+
+        // Normal case 2
+        let meta_datas = vec![
+            build_meta_data(vec![3, 5]),
+            build_meta_data(vec![0, 3, 5]),
+            build_meta_data(vec![0, 1, 2, 3, 5]),
+            build_meta_data(vec![1, 3, 5]),
+        ];
+        check_collect_column_stats(&schema, vec![3, 5], meta_datas);
+    }
+
+    #[test]
+    fn test_frequent_flush() {
+        let now = time_ext::current_time_millis();
+        let cases = vec![
+            (now - 1000, 100, false),
+            (now - 1000, 2000, true),
+            (now - 10000, 200, false),
+            (now - 2000, 2000, false),
+            (now + 2000, 1000, true),
+        ];
+        for (last_flush_time_ms, min_flush_interval_ms, expect) in cases {
+            let checker = FrequentFlushChecker {
+                min_flush_interval_ms,
+                last_flush_time_ms,
+            };
+
+            assert_eq!(expect, checker.is_frequent_flush());
+        }
+    }
 }
diff --git a/analytic_engine/src/instance/mod.rs 
b/analytic_engine/src/instance/mod.rs
index 75408a86..85841f4c 100644
--- a/analytic_engine/src/instance/mod.rs
+++ b/analytic_engine/src/instance/mod.rs
@@ -176,6 +176,8 @@ pub struct Instance {
     pub(crate) replay_batch_size: usize,
     /// Write sst max buffer size
     pub(crate) write_sst_max_buffer_size: usize,
+    /// The min interval between flushes
+    pub(crate) min_flush_interval: ReadableDuration,
     /// Max retry limit to flush memtables
     pub(crate) max_retry_flush_limit: usize,
     /// Max bytes per write batch
@@ -309,6 +311,18 @@ impl Instance {
             // Do flush in write runtime
             runtime: self.runtimes.write_runtime.clone(),
             write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: None,
+        }
+    }
+
+    #[inline]
+    fn make_flusher_with_min_interval(&self) -> Flusher {
+        Flusher {
+            space_store: self.space_store.clone(),
+            // Do flush in write runtime
+            runtime: self.runtimes.write_runtime.clone(),
+            write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: Some(self.min_flush_interval.as_millis()),
         }
     }
 
diff --git a/analytic_engine/src/instance/open.rs 
b/analytic_engine/src/instance/open.rs
index adf024db..fa5e6bae 100644
--- a/analytic_engine/src/instance/open.rs
+++ b/analytic_engine/src/instance/open.rs
@@ -125,6 +125,7 @@ impl Instance {
             compaction_runtime,
             scheduler_config,
             ctx.config.write_sst_max_buffer_size.as_byte() as usize,
+            ctx.config.min_flush_interval.as_millis(),
             scan_options_for_compaction,
         ));
 
@@ -152,6 +153,7 @@ impl Instance {
             space_write_buffer_size: ctx.config.space_write_buffer_size,
             replay_batch_size: ctx.config.replay_batch_size,
             write_sst_max_buffer_size: 
ctx.config.write_sst_max_buffer_size.as_byte() as usize,
+            min_flush_interval: ctx.config.min_flush_interval,
             max_retry_flush_limit: ctx.config.max_retry_flush_limit,
             mem_usage_sampling_interval: 
ctx.config.mem_usage_sampling_interval,
             max_bytes_per_write_batch: ctx
diff --git a/analytic_engine/src/instance/serial_executor.rs 
b/analytic_engine/src/instance/serial_executor.rs
index 56dcfc27..99fc3dbb 100644
--- a/analytic_engine/src/instance/serial_executor.rs
+++ b/analytic_engine/src/instance/serial_executor.rs
@@ -167,11 +167,7 @@ impl TableFlushScheduler {
                         *flush_state = FlushState::Flushing;
                         break;
                     }
-                    FlushState::Flushing => {
-                        if !block_on_write_thread {
-                            return Ok(());
-                        }
-                    }
+                    FlushState::Flushing => {}
                     FlushState::Failed { err_msg } => {
                         if self
                             .schedule_sync
diff --git a/analytic_engine/src/instance/wal_replayer.rs 
b/analytic_engine/src/instance/wal_replayer.rs
index 41ea3fb8..082b66d5 100644
--- a/analytic_engine/src/instance/wal_replayer.rs
+++ b/analytic_engine/src/instance/wal_replayer.rs
@@ -542,7 +542,8 @@ async fn replay_table_log_entries(
                 }
 
                 // Flush the table if necessary.
-                if table_data.should_flush_table(serial_exec) {
+                let in_flush = serial_exec.flush_scheduler().is_in_flush();
+                if table_data.should_flush_table(in_flush) {
                     let opts = TableFlushOptions {
                         res_sender: None,
                         max_retry_flush_limit,
diff --git a/analytic_engine/src/instance/write.rs 
b/analytic_engine/src/instance/write.rs
index ed738f86..e49ccaac 100644
--- a/analytic_engine/src/instance/write.rs
+++ b/analytic_engine/src/instance/write.rs
@@ -620,7 +620,8 @@ impl<'a> Writer<'a> {
             }
         }
 
-        if self.table_data.should_flush_table(self.serial_exec) {
+        let in_flush = self.serial_exec.flush_scheduler().is_in_flush();
+        if self.table_data.should_flush_table(in_flush) {
             let table_data = self.table_data.clone();
             let _timer = 
table_data.metrics.start_table_write_flush_wait_timer();
             self.handle_memtable_flush(&table_data).await?;
@@ -673,7 +674,7 @@ impl<'a> Writer<'a> {
             res_sender: None,
             max_retry_flush_limit: self.instance.max_retry_flush_limit(),
         };
-        let flusher = self.instance.make_flusher();
+        let flusher = self.instance.make_flusher_with_min_interval();
         if table_data.id == self.table_data.id {
             let flush_scheduler = self.serial_exec.flush_scheduler();
             // Set `block_on_write_thread` to false and let flush do in 
background.
diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs
index 015ccac0..5782d96e 100644
--- a/analytic_engine/src/lib.rs
+++ b/analytic_engine/src/lib.rs
@@ -115,6 +115,8 @@ pub struct Config {
     pub write_sst_max_buffer_size: ReadableSize,
     /// Max retry limit After flush failed
     pub max_retry_flush_limit: usize,
+    /// The min interval between two consecutive flushes
+    pub min_flush_interval: ReadableDuration,
     /// Max bytes per write batch.
     ///
     /// If this is set, the atomicity of write request will be broken.
@@ -210,6 +212,7 @@ impl Default for Config {
             scan_max_record_batches_in_flight: 1024,
             write_sst_max_buffer_size: ReadableSize::mb(10),
             max_retry_flush_limit: 0,
+            min_flush_interval: ReadableDuration::minutes(1),
             max_bytes_per_write_batch: None,
             mem_usage_sampling_interval: ReadableDuration::secs(0),
             wal_encode: WalEncodeConfig::default(),
diff --git a/analytic_engine/src/table/data.rs 
b/analytic_engine/src/table/data.rs
index 864137d6..06e01b57 100644
--- a/analytic_engine/src/table/data.rs
+++ b/analytic_engine/src/table/data.rs
@@ -637,9 +637,7 @@ impl TableData {
     }
 
     /// Returns true if the memory usage of this table reaches flush threshold
-    ///
-    /// REQUIRE: Do in write worker
-    pub fn should_flush_table(&self, serial_exec: &mut TableOpSerialExecutor) 
-> bool {
+    pub fn should_flush_table(&self, in_flush: bool) -> bool {
         // Fallback to usize::MAX if Failed to convert arena_block_size into
         // usize (overflow)
         let max_write_buffer_size = self
@@ -655,7 +653,6 @@ impl TableData {
 
         let mutable_usage = self.current_version.mutable_memory_usage();
         let total_usage = self.current_version.total_memory_usage();
-        let in_flush = serial_exec.flush_scheduler().is_in_flush();
         // Inspired by 
https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
         if mutable_usage > mutable_limit && !in_flush {
             info!(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to