This is an automated email from the ASF dual-hosted git repository.

tanruixiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3f5d8f45 fix: no write stall (#1388)
3f5d8f45 is described below

commit 3f5d8f450168815671c28858b4ec74378ce4f633
Author: WEI Xikai <[email protected]>
AuthorDate: Thu Dec 21 17:42:35 2023 +0800

    fix: no write stall (#1388)
    
    ## Rationale
    #1003 tries to avoid frequent flush requests which may generate massive
    small ssts, but the write stall is also removed in the normal write
    path.
    
    ## Detailed Changes
    Introduce the `min_flush_interval` to avoid frequent flush requests and
    recover the write stall mechanism.
    
    ## Test Plan
    Add unit tests for the frequent flush check.
---
 analytic_engine/src/compaction/scheduler.rs      |  4 ++
 analytic_engine/src/instance/flush_compaction.rs | 64 +++++++++++++++++++++++-
 analytic_engine/src/instance/mod.rs              | 14 ++++++
 analytic_engine/src/instance/open.rs             |  2 +
 analytic_engine/src/instance/serial_executor.rs  |  6 +--
 analytic_engine/src/instance/wal_replayer.rs     |  3 +-
 analytic_engine/src/instance/write.rs            |  5 +-
 analytic_engine/src/lib.rs                       |  3 ++
 analytic_engine/src/table/data.rs                |  5 +-
 9 files changed, 93 insertions(+), 13 deletions(-)

diff --git a/analytic_engine/src/compaction/scheduler.rs 
b/analytic_engine/src/compaction/scheduler.rs
index 72fe2f8b..d242b443 100644
--- a/analytic_engine/src/compaction/scheduler.rs
+++ b/analytic_engine/src/compaction/scheduler.rs
@@ -306,6 +306,7 @@ impl SchedulerImpl {
         runtime: Arc<Runtime>,
         config: SchedulerConfig,
         write_sst_max_buffer_size: usize,
+        min_flush_interval_ms: u64,
         scan_options: ScanOptions,
     ) -> Self {
         let (tx, rx) = mpsc::channel(config.schedule_channel_len);
@@ -321,6 +322,7 @@ impl SchedulerImpl {
             max_ongoing_tasks: config.max_ongoing_tasks,
             max_unflushed_duration: config.max_unflushed_duration.0,
             write_sst_max_buffer_size,
+            min_flush_interval_ms,
             scan_options,
             limit: Arc::new(OngoingTaskLimit {
                 ongoing_tasks: AtomicUsize::new(0),
@@ -399,6 +401,7 @@ struct ScheduleWorker {
     picker_manager: PickerManager,
     max_ongoing_tasks: usize,
     write_sst_max_buffer_size: usize,
+    min_flush_interval_ms: u64,
     scan_options: ScanOptions,
     limit: Arc<OngoingTaskLimit>,
     running: Arc<AtomicBool>,
@@ -665,6 +668,7 @@ impl ScheduleWorker {
             space_store: self.space_store.clone(),
             runtime: self.runtime.clone(),
             write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: Some(self.min_flush_interval_ms),
         };
 
         for table_data in &tables_buf {
diff --git a/analytic_engine/src/instance/flush_compaction.rs 
b/analytic_engine/src/instance/flush_compaction.rs
index cbede944..7094bb35 100644
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@ -213,6 +213,9 @@ pub struct Flusher {
 
     pub runtime: RuntimeRef,
     pub write_sst_max_buffer_size: usize,
+    /// If the interval is set, it will generate a [`FlushTask`] with min flush
+    /// interval check.
+    pub min_flush_interval_ms: Option<u64>,
 }
 
 struct FlushTask {
@@ -220,6 +223,22 @@ struct FlushTask {
     table_data: TableDataRef,
     runtime: RuntimeRef,
     write_sst_max_buffer_size: usize,
+    // If the interval is set, it will be used to check whether flush is too 
frequent.
+    min_flush_interval_ms: Option<u64>,
+}
+
+/// The checker to determine whether a flush is frequent.
+struct FrequentFlushChecker {
+    min_flush_interval_ms: u64,
+    last_flush_time_ms: u64,
+}
+
+impl FrequentFlushChecker {
+    #[inline]
+    fn is_frequent_flush(&self) -> bool {
+        let now = time_ext::current_time_millis();
+        self.last_flush_time_ms + self.min_flush_interval_ms > now
+    }
 }
 
 impl Flusher {
@@ -268,6 +287,7 @@ impl Flusher {
             space_store: self.space_store.clone(),
             runtime: self.runtime.clone(),
             write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: self.min_flush_interval_ms,
         };
         let flush_job = async move { flush_task.run().await };
 
@@ -281,6 +301,16 @@ impl FlushTask {
     /// Each table can only have one running flush task at the same time, which
     /// should be ensured by the caller.
     async fn run(&self) -> Result<()> {
+        let large_enough = self.table_data.should_flush_table(false);
+        if !large_enough && self.is_frequent_flush() {
+            debug!(
+                "Ignore flush task for too frequent flush of small memtable, 
table:{}",
+                self.table_data.name
+            );
+
+            return Ok(());
+        }
+
         let instant = Instant::now();
         let flush_req = self.preprocess_flush(&self.table_data).await?;
 
@@ -320,6 +350,18 @@ impl FlushTask {
         Ok(())
     }
 
+    fn is_frequent_flush(&self) -> bool {
+        if let Some(min_flush_interval_ms) = self.min_flush_interval_ms {
+            let checker = FrequentFlushChecker {
+                min_flush_interval_ms,
+                last_flush_time_ms: self.table_data.last_flush_time(),
+            };
+            checker.is_frequent_flush()
+        } else {
+            false
+        }
+    }
+
     async fn preprocess_flush(&self, table_data: &TableDataRef) -> 
Result<TableFlushRequest> {
         let current_version = table_data.current_version();
         let mut last_sequence = table_data.last_sequence();
@@ -1190,7 +1232,7 @@ mod tests {
         time::TimeRange,
     };
 
-    use super::collect_column_stats_from_meta_datas;
+    use super::{collect_column_stats_from_meta_datas, FrequentFlushChecker};
     use crate::{
         instance::flush_compaction::split_record_batch_with_time_ranges,
         sst::{
@@ -1317,4 +1359,24 @@ mod tests {
         ];
         check_collect_column_stats(&schema, vec![3, 5], meta_datas);
     }
+
+    #[test]
+    fn test_frequent_flush() {
+        let now = time_ext::current_time_millis();
+        let cases = vec![
+            (now - 1000, 100, false),
+            (now - 1000, 2000, true),
+            (now - 10000, 200, false),
+            (now - 2000, 2000, false),
+            (now + 2000, 1000, true),
+        ];
+        for (last_flush_time_ms, min_flush_interval_ms, expect) in cases {
+            let checker = FrequentFlushChecker {
+                min_flush_interval_ms,
+                last_flush_time_ms,
+            };
+
+            assert_eq!(expect, checker.is_frequent_flush());
+        }
+    }
 }
diff --git a/analytic_engine/src/instance/mod.rs 
b/analytic_engine/src/instance/mod.rs
index 4b70a3f0..031f867e 100644
--- a/analytic_engine/src/instance/mod.rs
+++ b/analytic_engine/src/instance/mod.rs
@@ -176,6 +176,8 @@ pub struct Instance {
     pub(crate) replay_batch_size: usize,
     /// Write sst max buffer size
     pub(crate) write_sst_max_buffer_size: usize,
+    /// The min interval between flushes
+    pub(crate) min_flush_interval: ReadableDuration,
     /// Max retry limit to flush memtables
     pub(crate) max_retry_flush_limit: usize,
     /// Max bytes per write batch
@@ -304,6 +306,18 @@ impl Instance {
             // Do flush in write runtime
             runtime: self.runtimes.write_runtime.clone(),
             write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: None,
+        }
+    }
+
+    #[inline]
+    fn make_flusher_with_min_interval(&self) -> Flusher {
+        Flusher {
+            space_store: self.space_store.clone(),
+            // Do flush in write runtime
+            runtime: self.runtimes.write_runtime.clone(),
+            write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+            min_flush_interval_ms: Some(self.min_flush_interval.as_millis()),
         }
     }
 
diff --git a/analytic_engine/src/instance/open.rs 
b/analytic_engine/src/instance/open.rs
index 446d3633..6fcf29d0 100644
--- a/analytic_engine/src/instance/open.rs
+++ b/analytic_engine/src/instance/open.rs
@@ -110,6 +110,7 @@ impl Instance {
             compaction_runtime,
             scheduler_config,
             ctx.config.write_sst_max_buffer_size.as_byte() as usize,
+            ctx.config.min_flush_interval.as_millis(),
             scan_options_for_compaction,
         ));
 
@@ -137,6 +138,7 @@ impl Instance {
             space_write_buffer_size: ctx.config.space_write_buffer_size,
             replay_batch_size: ctx.config.replay_batch_size,
             write_sst_max_buffer_size: 
ctx.config.write_sst_max_buffer_size.as_byte() as usize,
+            min_flush_interval: ctx.config.min_flush_interval,
             max_retry_flush_limit: ctx.config.max_retry_flush_limit,
             mem_usage_sampling_interval: 
ctx.config.mem_usage_sampling_interval,
             max_bytes_per_write_batch: ctx
diff --git a/analytic_engine/src/instance/serial_executor.rs 
b/analytic_engine/src/instance/serial_executor.rs
index 579f6892..b5187048 100644
--- a/analytic_engine/src/instance/serial_executor.rs
+++ b/analytic_engine/src/instance/serial_executor.rs
@@ -166,11 +166,7 @@ impl TableFlushScheduler {
                         *flush_state = FlushState::Flushing;
                         break;
                     }
-                    FlushState::Flushing => {
-                        if !block_on_write_thread {
-                            return Ok(());
-                        }
-                    }
+                    FlushState::Flushing => {}
                     FlushState::Failed { err_msg } => {
                         if self
                             .schedule_sync
diff --git a/analytic_engine/src/instance/wal_replayer.rs 
b/analytic_engine/src/instance/wal_replayer.rs
index 41ea3fb8..082b66d5 100644
--- a/analytic_engine/src/instance/wal_replayer.rs
+++ b/analytic_engine/src/instance/wal_replayer.rs
@@ -542,7 +542,8 @@ async fn replay_table_log_entries(
                 }
 
                 // Flush the table if necessary.
-                if table_data.should_flush_table(serial_exec) {
+                let in_flush = serial_exec.flush_scheduler().is_in_flush();
+                if table_data.should_flush_table(in_flush) {
                     let opts = TableFlushOptions {
                         res_sender: None,
                         max_retry_flush_limit,
diff --git a/analytic_engine/src/instance/write.rs 
b/analytic_engine/src/instance/write.rs
index ed738f86..e49ccaac 100644
--- a/analytic_engine/src/instance/write.rs
+++ b/analytic_engine/src/instance/write.rs
@@ -620,7 +620,8 @@ impl<'a> Writer<'a> {
             }
         }
 
-        if self.table_data.should_flush_table(self.serial_exec) {
+        let in_flush = self.serial_exec.flush_scheduler().is_in_flush();
+        if self.table_data.should_flush_table(in_flush) {
             let table_data = self.table_data.clone();
             let _timer = 
table_data.metrics.start_table_write_flush_wait_timer();
             self.handle_memtable_flush(&table_data).await?;
@@ -673,7 +674,7 @@ impl<'a> Writer<'a> {
             res_sender: None,
             max_retry_flush_limit: self.instance.max_retry_flush_limit(),
         };
-        let flusher = self.instance.make_flusher();
+        let flusher = self.instance.make_flusher_with_min_interval();
         if table_data.id == self.table_data.id {
             let flush_scheduler = self.serial_exec.flush_scheduler();
             // Set `block_on_write_thread` to false and let flush do in 
background.
diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs
index 64ba8942..4e951c34 100644
--- a/analytic_engine/src/lib.rs
+++ b/analytic_engine/src/lib.rs
@@ -97,6 +97,8 @@ pub struct Config {
     pub write_sst_max_buffer_size: ReadableSize,
     /// Max retry limit After flush failed
     pub max_retry_flush_limit: usize,
+    /// The min interval between two consecutive flushes
+    pub min_flush_interval: ReadableDuration,
     /// Max bytes per write batch.
     ///
     /// If this is set, the atomicity of write request will be broken.
@@ -185,6 +187,7 @@ impl Default for Config {
             scan_max_record_batches_in_flight: 1024,
             write_sst_max_buffer_size: ReadableSize::mb(10),
             max_retry_flush_limit: 0,
+            min_flush_interval: ReadableDuration::minutes(1),
             max_bytes_per_write_batch: None,
             mem_usage_sampling_interval: ReadableDuration::secs(0),
             wal_encode: WalEncodeConfig::default(),
diff --git a/analytic_engine/src/table/data.rs 
b/analytic_engine/src/table/data.rs
index 29286042..2c011a9c 100644
--- a/analytic_engine/src/table/data.rs
+++ b/analytic_engine/src/table/data.rs
@@ -584,9 +584,7 @@ impl TableData {
     }
 
     /// Returns true if the memory usage of this table reaches flush threshold
-    ///
-    /// REQUIRE: Do in write worker
-    pub fn should_flush_table(&self, serial_exec: &mut TableOpSerialExecutor) 
-> bool {
+    pub fn should_flush_table(&self, in_flush: bool) -> bool {
         // Fallback to usize::MAX if Failed to convert arena_block_size into
         // usize (overflow)
         let max_write_buffer_size = self
@@ -602,7 +600,6 @@ impl TableData {
 
         let mutable_usage = self.current_version.mutable_memory_usage();
         let total_usage = self.current_version.total_memory_usage();
-        let in_flush = serial_exec.flush_scheduler().is_in_flush();
         // Inspired by 
https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
         if mutable_usage > mutable_limit && !in_flush {
             info!(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to