This is an automated email from the ASF dual-hosted git repository. xikai pushed a commit to branch memtable-poc in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
commit 9abd326d89fab9f4bd89ddf69898b3238fac03fe Author: WEI Xikai <[email protected]> AuthorDate: Thu Dec 21 17:42:35 2023 +0800 fix: no write stall (#1388) small ssts, but the write stall is also removed in the normal write path. Introduce the `min_flush_interval` to avoid frequent flush requests and recover the write stall mechanism. Add unit tests for the frequent flush check. --- analytic_engine/src/compaction/scheduler.rs | 4 + analytic_engine/src/instance/flush_compaction.rs | 138 ++++++++++++++++++++++- analytic_engine/src/instance/mod.rs | 14 +++ analytic_engine/src/instance/open.rs | 2 + analytic_engine/src/instance/serial_executor.rs | 6 +- analytic_engine/src/instance/wal_replayer.rs | 3 +- analytic_engine/src/instance/write.rs | 5 +- analytic_engine/src/lib.rs | 3 + analytic_engine/src/table/data.rs | 5 +- 9 files changed, 167 insertions(+), 13 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 1496ef51..9f6bdb80 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -306,6 +306,7 @@ impl SchedulerImpl { runtime: Arc<Runtime>, config: SchedulerConfig, write_sst_max_buffer_size: usize, + min_flush_interval_ms: u64, scan_options: ScanOptions, ) -> Self { let (tx, rx) = mpsc::channel(config.schedule_channel_len); @@ -321,6 +322,7 @@ impl SchedulerImpl { max_ongoing_tasks: config.max_ongoing_tasks, max_unflushed_duration: config.max_unflushed_duration.0, write_sst_max_buffer_size, + min_flush_interval_ms, scan_options, limit: Arc::new(OngoingTaskLimit { ongoing_tasks: AtomicUsize::new(0), @@ -399,6 +401,7 @@ struct ScheduleWorker { picker_manager: PickerManager, max_ongoing_tasks: usize, write_sst_max_buffer_size: usize, + min_flush_interval_ms: u64, scan_options: ScanOptions, limit: Arc<OngoingTaskLimit>, running: Arc<AtomicBool>, @@ -664,6 +667,7 @@ impl ScheduleWorker { space_store: self.space_store.clone(), runtime: self.runtime.clone(), write_sst_max_buffer_size: self.write_sst_max_buffer_size, + min_flush_interval_ms: Some(self.min_flush_interval_ms), }; for table_data in &tables_buf { diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 7e960dab..5b7ec2e8 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -208,6 +208,9 @@ pub struct Flusher { pub runtime: RuntimeRef, pub write_sst_max_buffer_size: usize, + /// If the interval is set, it will generate a [`FlushTask`] with min flush + /// interval check. + pub min_flush_interval_ms: Option<u64>, } struct FlushTask { @@ -215,6 +218,22 @@ struct FlushTask { table_data: TableDataRef, runtime: RuntimeRef, write_sst_max_buffer_size: usize, + // If the interval is set, it will be used to check whether flush is too frequent. + min_flush_interval_ms: Option<u64>, +} + +/// The checker to determine whether a flush is frequent. +struct FrequentFlushChecker { + min_flush_interval_ms: u64, + last_flush_time_ms: u64, +} + +impl FrequentFlushChecker { + #[inline] + fn is_frequent_flush(&self) -> bool { + let now = time_ext::current_time_millis(); + self.last_flush_time_ms + self.min_flush_interval_ms > now + } } impl Flusher { @@ -263,6 +282,7 @@ impl Flusher { space_store: self.space_store.clone(), runtime: self.runtime.clone(), write_sst_max_buffer_size: self.write_sst_max_buffer_size, + min_flush_interval_ms: self.min_flush_interval_ms, }; let flush_job = async move { flush_task.run().await }; @@ -276,6 +296,16 @@ impl FlushTask { /// Each table can only have one running flush task at the same time, which /// should be ensured by the caller. async fn run(&self) -> Result<()> { + let large_enough = self.table_data.should_flush_table(false); + if !large_enough && self.is_frequent_flush() { + debug!( + "Ignore flush task for too frequent flush of small memtable, table:{}", + self.table_data.name + ); + + return Ok(()); + } + let instant = Instant::now(); let flush_req = self.preprocess_flush(&self.table_data).await?; @@ -315,6 +345,18 @@ impl FlushTask { Ok(()) } + fn is_frequent_flush(&self) -> bool { + if let Some(min_flush_interval_ms) = self.min_flush_interval_ms { + let checker = FrequentFlushChecker { + min_flush_interval_ms, + last_flush_time_ms: self.table_data.last_flush_time(), + }; + checker.is_frequent_flush() + } else { + false + } + } + async fn preprocess_flush(&self, table_data: &TableDataRef) -> Result<TableFlushRequest> { let current_version = table_data.current_version(); let mut last_sequence = table_data.last_sequence(); @@ -1169,7 +1211,14 @@ mod tests { time::TimeRange, }; - use crate::instance::flush_compaction::split_record_batch_with_time_ranges; + use super::{collect_column_stats_from_meta_datas, FrequentFlushChecker}; + use crate::{ + instance::flush_compaction::split_record_batch_with_time_ranges, + sst::{ + meta_data::SstMetaData, + parquet::meta_data::{ColumnValueSet, ParquetMetaData}, + }, + }; #[test] fn test_split_record_batch_with_time_ranges() { @@ -1222,4 +1271,91 @@ mod tests { check_record_batch_with_key_with_rows(&rets[1], rows1.len(), column_num, rows1); check_record_batch_with_key_with_rows(&rets[2], rows2.len(), column_num, rows2); } + + fn check_collect_column_stats( + schema: &Schema, + expected_low_cardinality_col_indexes: Vec<usize>, + meta_datas: Vec<SstMetaData>, + ) { + let column_stats = collect_column_stats_from_meta_datas(&meta_datas); + assert_eq!( + column_stats.len(), + expected_low_cardinality_col_indexes.len() + ); + + for col_idx in expected_low_cardinality_col_indexes { + let col_schema = schema.column(col_idx); + assert!(column_stats.contains_key(&col_schema.name)); + } + } + + #[test] + fn test_collect_column_stats_from_metadata() { + let schema = build_schema(); + let build_meta_data = |low_cardinality_col_indexes: Vec<usize>| { + let mut column_values = vec![None; 6]; + for idx in low_cardinality_col_indexes { + column_values[idx] = Some(ColumnValueSet::StringValue(Default::default())); + } + let parquet_meta_data = ParquetMetaData { + min_key: Bytes::new(), + max_key: Bytes::new(), + time_range: TimeRange::empty(), + max_sequence: 0, + schema: schema.clone(), + parquet_filter: None, + column_values: Some(column_values), + }; + SstMetaData::Parquet(Arc::new(parquet_meta_data)) + }; + + // Normal case 0 + let meta_datas = vec![ + build_meta_data(vec![0]), + build_meta_data(vec![0]), + build_meta_data(vec![0, 1]), + build_meta_data(vec![0, 2]), + build_meta_data(vec![0, 3]), + ]; + check_collect_column_stats(&schema, vec![0], meta_datas); + + // Normal case 1 + let meta_datas = vec![ + build_meta_data(vec![0]), + build_meta_data(vec![0]), + build_meta_data(vec![]), + build_meta_data(vec![1]), + build_meta_data(vec![3]), + ]; + check_collect_column_stats(&schema, vec![], meta_datas); + + // Normal case 2 + let meta_datas = vec![ + build_meta_data(vec![3, 5]), + build_meta_data(vec![0, 3, 5]), + build_meta_data(vec![0, 1, 2, 3, 5]), + build_meta_data(vec![1, 3, 5]), + ]; + check_collect_column_stats(&schema, vec![3, 5], meta_datas); + } + + #[test] + fn test_frequent_flush() { + let now = time_ext::current_time_millis(); + let cases = vec![ + (now - 1000, 100, false), + (now - 1000, 2000, true), + (now - 10000, 200, false), + (now - 2000, 2000, false), + (now + 2000, 1000, true), + ]; + for (last_flush_time_ms, min_flush_interval_ms, expect) in cases { + let checker = FrequentFlushChecker { + min_flush_interval_ms, + last_flush_time_ms, + }; + + assert_eq!(expect, checker.is_frequent_flush()); + } + } } diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 75408a86..85841f4c 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -176,6 +176,8 @@ pub struct Instance { pub(crate) replay_batch_size: usize, /// Write sst max buffer size pub(crate) write_sst_max_buffer_size: usize, + /// The min interval between flushes + pub(crate) min_flush_interval: ReadableDuration, /// Max retry limit to flush memtables pub(crate) max_retry_flush_limit: usize, /// Max bytes per write batch @@ -309,6 +311,18 @@ impl Instance { // Do flush in write runtime runtime: self.runtimes.write_runtime.clone(), write_sst_max_buffer_size: self.write_sst_max_buffer_size, + min_flush_interval_ms: None, + } + } + + #[inline] + fn make_flusher_with_min_interval(&self) -> Flusher { + Flusher { + space_store: self.space_store.clone(), + // Do flush in write runtime + runtime: self.runtimes.write_runtime.clone(), + write_sst_max_buffer_size: self.write_sst_max_buffer_size, + min_flush_interval_ms: Some(self.min_flush_interval.as_millis()), } } diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index adf024db..fa5e6bae 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -125,6 +125,7 @@ impl Instance { compaction_runtime, scheduler_config, ctx.config.write_sst_max_buffer_size.as_byte() as usize, + ctx.config.min_flush_interval.as_millis(), scan_options_for_compaction, )); @@ -152,6 +153,7 @@ impl Instance { space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_byte() as usize, + min_flush_interval: ctx.config.min_flush_interval, max_retry_flush_limit: ctx.config.max_retry_flush_limit, mem_usage_sampling_interval: ctx.config.mem_usage_sampling_interval, max_bytes_per_write_batch: ctx diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index 56dcfc27..99fc3dbb 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -167,11 +167,7 @@ impl TableFlushScheduler { *flush_state = FlushState::Flushing; break; } - FlushState::Flushing => { - if !block_on_write_thread { - return Ok(()); - } - } + FlushState::Flushing => {} FlushState::Failed { err_msg } => { if self .schedule_sync diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 41ea3fb8..082b66d5 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -542,7 +542,8 @@ async fn replay_table_log_entries( } // Flush the table if necessary. - if table_data.should_flush_table(serial_exec) { + let in_flush = serial_exec.flush_scheduler().is_in_flush(); + if table_data.should_flush_table(in_flush) { let opts = TableFlushOptions { res_sender: None, max_retry_flush_limit, diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index ed738f86..e49ccaac 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -620,7 +620,8 @@ impl<'a> Writer<'a> { } } - if self.table_data.should_flush_table(self.serial_exec) { + let in_flush = self.serial_exec.flush_scheduler().is_in_flush(); + if self.table_data.should_flush_table(in_flush) { let table_data = self.table_data.clone(); let _timer = table_data.metrics.start_table_write_flush_wait_timer(); self.handle_memtable_flush(&table_data).await?; @@ -673,7 +674,7 @@ impl<'a> Writer<'a> { res_sender: None, max_retry_flush_limit: self.instance.max_retry_flush_limit(), }; - let flusher = self.instance.make_flusher(); + let flusher = self.instance.make_flusher_with_min_interval(); if table_data.id == self.table_data.id { let flush_scheduler = self.serial_exec.flush_scheduler(); // Set `block_on_write_thread` to false and let flush do in background. diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 015ccac0..5782d96e 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -115,6 +115,8 @@ pub struct Config { pub write_sst_max_buffer_size: ReadableSize, /// Max retry limit After flush failed pub max_retry_flush_limit: usize, + /// The min interval between two consecutive flushes + pub min_flush_interval: ReadableDuration, /// Max bytes per write batch. /// /// If this is set, the atomicity of write request will be broken. @@ -210,6 +212,7 @@ impl Default for Config { scan_max_record_batches_in_flight: 1024, write_sst_max_buffer_size: ReadableSize::mb(10), max_retry_flush_limit: 0, + min_flush_interval: ReadableDuration::minutes(1), max_bytes_per_write_batch: None, mem_usage_sampling_interval: ReadableDuration::secs(0), wal_encode: WalEncodeConfig::default(), diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 864137d6..06e01b57 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -637,9 +637,7 @@ impl TableData { } /// Returns true if the memory usage of this table reaches flush threshold - /// - /// REQUIRE: Do in write worker - pub fn should_flush_table(&self, serial_exec: &mut TableOpSerialExecutor) -> bool { + pub fn should_flush_table(&self, in_flush: bool) -> bool { // Fallback to usize::MAX if Failed to convert arena_block_size into // usize (overflow) let max_write_buffer_size = self @@ -655,7 +653,6 @@ impl TableData { let mutable_usage = self.current_version.mutable_memory_usage(); let total_usage = self.current_version.total_memory_usage(); - let in_flush = serial_exec.flush_scheduler().is_in_flush(); // Inspired by https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94 if mutable_usage > mutable_limit && !in_flush { info!( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
