This is an automated email from the ASF dual-hosted git repository.
tanruixiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new 3f5d8f45 fix: no write stall (#1388)
3f5d8f45 is described below
commit 3f5d8f450168815671c28858b4ec74378ce4f633
Author: WEI Xikai <[email protected]>
AuthorDate: Thu Dec 21 17:42:35 2023 +0800
fix: no write stall (#1388)
## Rationale
#1003 tries to avoid frequent flush requests which may generate massive
small ssts, but the write stall is also removed in the normal write
path.
## Detailed Changes
Introduce the `min_flush_interval` to avoid frequent flush requests and
recover the write stall mechanism.
## Test Plan
Add unit tests for the frequent flush check.
---
analytic_engine/src/compaction/scheduler.rs | 4 ++
analytic_engine/src/instance/flush_compaction.rs | 64 +++++++++++++++++++++++-
analytic_engine/src/instance/mod.rs | 14 ++++++
analytic_engine/src/instance/open.rs | 2 +
analytic_engine/src/instance/serial_executor.rs | 6 +--
analytic_engine/src/instance/wal_replayer.rs | 3 +-
analytic_engine/src/instance/write.rs | 5 +-
analytic_engine/src/lib.rs | 3 ++
analytic_engine/src/table/data.rs | 5 +-
9 files changed, 93 insertions(+), 13 deletions(-)
diff --git a/analytic_engine/src/compaction/scheduler.rs
b/analytic_engine/src/compaction/scheduler.rs
index 72fe2f8b..d242b443 100644
--- a/analytic_engine/src/compaction/scheduler.rs
+++ b/analytic_engine/src/compaction/scheduler.rs
@@ -306,6 +306,7 @@ impl SchedulerImpl {
runtime: Arc<Runtime>,
config: SchedulerConfig,
write_sst_max_buffer_size: usize,
+ min_flush_interval_ms: u64,
scan_options: ScanOptions,
) -> Self {
let (tx, rx) = mpsc::channel(config.schedule_channel_len);
@@ -321,6 +322,7 @@ impl SchedulerImpl {
max_ongoing_tasks: config.max_ongoing_tasks,
max_unflushed_duration: config.max_unflushed_duration.0,
write_sst_max_buffer_size,
+ min_flush_interval_ms,
scan_options,
limit: Arc::new(OngoingTaskLimit {
ongoing_tasks: AtomicUsize::new(0),
@@ -399,6 +401,7 @@ struct ScheduleWorker {
picker_manager: PickerManager,
max_ongoing_tasks: usize,
write_sst_max_buffer_size: usize,
+ min_flush_interval_ms: u64,
scan_options: ScanOptions,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
@@ -665,6 +668,7 @@ impl ScheduleWorker {
space_store: self.space_store.clone(),
runtime: self.runtime.clone(),
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+ min_flush_interval_ms: Some(self.min_flush_interval_ms),
};
for table_data in &tables_buf {
diff --git a/analytic_engine/src/instance/flush_compaction.rs
b/analytic_engine/src/instance/flush_compaction.rs
index cbede944..7094bb35 100644
--- a/analytic_engine/src/instance/flush_compaction.rs
+++ b/analytic_engine/src/instance/flush_compaction.rs
@@ -213,6 +213,9 @@ pub struct Flusher {
pub runtime: RuntimeRef,
pub write_sst_max_buffer_size: usize,
+ /// If the interval is set, it will generate a [`FlushTask`] with min flush
+ /// interval check.
+ pub min_flush_interval_ms: Option<u64>,
}
struct FlushTask {
@@ -220,6 +223,22 @@ struct FlushTask {
table_data: TableDataRef,
runtime: RuntimeRef,
write_sst_max_buffer_size: usize,
+ // If the interval is set, it will be used to check whether flush is too
frequent.
+ min_flush_interval_ms: Option<u64>,
+}
+
+/// The checker to determine whether a flush is frequent.
+struct FrequentFlushChecker {
+ min_flush_interval_ms: u64,
+ last_flush_time_ms: u64,
+}
+
+impl FrequentFlushChecker {
+ #[inline]
+ fn is_frequent_flush(&self) -> bool {
+ let now = time_ext::current_time_millis();
+ self.last_flush_time_ms + self.min_flush_interval_ms > now
+ }
}
impl Flusher {
@@ -268,6 +287,7 @@ impl Flusher {
space_store: self.space_store.clone(),
runtime: self.runtime.clone(),
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+ min_flush_interval_ms: self.min_flush_interval_ms,
};
let flush_job = async move { flush_task.run().await };
@@ -281,6 +301,16 @@ impl FlushTask {
/// Each table can only have one running flush task at the same time, which
/// should be ensured by the caller.
async fn run(&self) -> Result<()> {
+ let large_enough = self.table_data.should_flush_table(false);
+ if !large_enough && self.is_frequent_flush() {
+ debug!(
+ "Ignore flush task for too frequent flush of small memtable,
table:{}",
+ self.table_data.name
+ );
+
+ return Ok(());
+ }
+
let instant = Instant::now();
let flush_req = self.preprocess_flush(&self.table_data).await?;
@@ -320,6 +350,18 @@ impl FlushTask {
Ok(())
}
+ fn is_frequent_flush(&self) -> bool {
+ if let Some(min_flush_interval_ms) = self.min_flush_interval_ms {
+ let checker = FrequentFlushChecker {
+ min_flush_interval_ms,
+ last_flush_time_ms: self.table_data.last_flush_time(),
+ };
+ checker.is_frequent_flush()
+ } else {
+ false
+ }
+ }
+
async fn preprocess_flush(&self, table_data: &TableDataRef) ->
Result<TableFlushRequest> {
let current_version = table_data.current_version();
let mut last_sequence = table_data.last_sequence();
@@ -1190,7 +1232,7 @@ mod tests {
time::TimeRange,
};
- use super::collect_column_stats_from_meta_datas;
+ use super::{collect_column_stats_from_meta_datas, FrequentFlushChecker};
use crate::{
instance::flush_compaction::split_record_batch_with_time_ranges,
sst::{
@@ -1317,4 +1359,24 @@ mod tests {
];
check_collect_column_stats(&schema, vec![3, 5], meta_datas);
}
+
+ #[test]
+ fn test_frequent_flush() {
+ let now = time_ext::current_time_millis();
+ let cases = vec![
+ (now - 1000, 100, false),
+ (now - 1000, 2000, true),
+ (now - 10000, 200, false),
+ (now - 2000, 2000, false),
+ (now + 2000, 1000, true),
+ ];
+ for (last_flush_time_ms, min_flush_interval_ms, expect) in cases {
+ let checker = FrequentFlushChecker {
+ min_flush_interval_ms,
+ last_flush_time_ms,
+ };
+
+ assert_eq!(expect, checker.is_frequent_flush());
+ }
+ }
}
diff --git a/analytic_engine/src/instance/mod.rs
b/analytic_engine/src/instance/mod.rs
index 4b70a3f0..031f867e 100644
--- a/analytic_engine/src/instance/mod.rs
+++ b/analytic_engine/src/instance/mod.rs
@@ -176,6 +176,8 @@ pub struct Instance {
pub(crate) replay_batch_size: usize,
/// Write sst max buffer size
pub(crate) write_sst_max_buffer_size: usize,
+ /// The min interval between flushes
+ pub(crate) min_flush_interval: ReadableDuration,
/// Max retry limit to flush memtables
pub(crate) max_retry_flush_limit: usize,
/// Max bytes per write batch
@@ -304,6 +306,18 @@ impl Instance {
// Do flush in write runtime
runtime: self.runtimes.write_runtime.clone(),
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+ min_flush_interval_ms: None,
+ }
+ }
+
+ #[inline]
+ fn make_flusher_with_min_interval(&self) -> Flusher {
+ Flusher {
+ space_store: self.space_store.clone(),
+ // Do flush in write runtime
+ runtime: self.runtimes.write_runtime.clone(),
+ write_sst_max_buffer_size: self.write_sst_max_buffer_size,
+ min_flush_interval_ms: Some(self.min_flush_interval.as_millis()),
}
}
diff --git a/analytic_engine/src/instance/open.rs
b/analytic_engine/src/instance/open.rs
index 446d3633..6fcf29d0 100644
--- a/analytic_engine/src/instance/open.rs
+++ b/analytic_engine/src/instance/open.rs
@@ -110,6 +110,7 @@ impl Instance {
compaction_runtime,
scheduler_config,
ctx.config.write_sst_max_buffer_size.as_byte() as usize,
+ ctx.config.min_flush_interval.as_millis(),
scan_options_for_compaction,
));
@@ -137,6 +138,7 @@ impl Instance {
space_write_buffer_size: ctx.config.space_write_buffer_size,
replay_batch_size: ctx.config.replay_batch_size,
write_sst_max_buffer_size:
ctx.config.write_sst_max_buffer_size.as_byte() as usize,
+ min_flush_interval: ctx.config.min_flush_interval,
max_retry_flush_limit: ctx.config.max_retry_flush_limit,
mem_usage_sampling_interval:
ctx.config.mem_usage_sampling_interval,
max_bytes_per_write_batch: ctx
diff --git a/analytic_engine/src/instance/serial_executor.rs
b/analytic_engine/src/instance/serial_executor.rs
index 579f6892..b5187048 100644
--- a/analytic_engine/src/instance/serial_executor.rs
+++ b/analytic_engine/src/instance/serial_executor.rs
@@ -166,11 +166,7 @@ impl TableFlushScheduler {
*flush_state = FlushState::Flushing;
break;
}
- FlushState::Flushing => {
- if !block_on_write_thread {
- return Ok(());
- }
- }
+ FlushState::Flushing => {}
FlushState::Failed { err_msg } => {
if self
.schedule_sync
diff --git a/analytic_engine/src/instance/wal_replayer.rs
b/analytic_engine/src/instance/wal_replayer.rs
index 41ea3fb8..082b66d5 100644
--- a/analytic_engine/src/instance/wal_replayer.rs
+++ b/analytic_engine/src/instance/wal_replayer.rs
@@ -542,7 +542,8 @@ async fn replay_table_log_entries(
}
// Flush the table if necessary.
- if table_data.should_flush_table(serial_exec) {
+ let in_flush = serial_exec.flush_scheduler().is_in_flush();
+ if table_data.should_flush_table(in_flush) {
let opts = TableFlushOptions {
res_sender: None,
max_retry_flush_limit,
diff --git a/analytic_engine/src/instance/write.rs
b/analytic_engine/src/instance/write.rs
index ed738f86..e49ccaac 100644
--- a/analytic_engine/src/instance/write.rs
+++ b/analytic_engine/src/instance/write.rs
@@ -620,7 +620,8 @@ impl<'a> Writer<'a> {
}
}
- if self.table_data.should_flush_table(self.serial_exec) {
+ let in_flush = self.serial_exec.flush_scheduler().is_in_flush();
+ if self.table_data.should_flush_table(in_flush) {
let table_data = self.table_data.clone();
let _timer =
table_data.metrics.start_table_write_flush_wait_timer();
self.handle_memtable_flush(&table_data).await?;
@@ -673,7 +674,7 @@ impl<'a> Writer<'a> {
res_sender: None,
max_retry_flush_limit: self.instance.max_retry_flush_limit(),
};
- let flusher = self.instance.make_flusher();
+ let flusher = self.instance.make_flusher_with_min_interval();
if table_data.id == self.table_data.id {
let flush_scheduler = self.serial_exec.flush_scheduler();
// Set `block_on_write_thread` to false and let flush do in
background.
diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs
index 64ba8942..4e951c34 100644
--- a/analytic_engine/src/lib.rs
+++ b/analytic_engine/src/lib.rs
@@ -97,6 +97,8 @@ pub struct Config {
pub write_sst_max_buffer_size: ReadableSize,
/// Max retry limit After flush failed
pub max_retry_flush_limit: usize,
+ /// The min interval between two consecutive flushes
+ pub min_flush_interval: ReadableDuration,
/// Max bytes per write batch.
///
/// If this is set, the atomicity of write request will be broken.
@@ -185,6 +187,7 @@ impl Default for Config {
scan_max_record_batches_in_flight: 1024,
write_sst_max_buffer_size: ReadableSize::mb(10),
max_retry_flush_limit: 0,
+ min_flush_interval: ReadableDuration::minutes(1),
max_bytes_per_write_batch: None,
mem_usage_sampling_interval: ReadableDuration::secs(0),
wal_encode: WalEncodeConfig::default(),
diff --git a/analytic_engine/src/table/data.rs
b/analytic_engine/src/table/data.rs
index 29286042..2c011a9c 100644
--- a/analytic_engine/src/table/data.rs
+++ b/analytic_engine/src/table/data.rs
@@ -584,9 +584,7 @@ impl TableData {
}
/// Returns true if the memory usage of this table reaches flush threshold
- ///
- /// REQUIRE: Do in write worker
- pub fn should_flush_table(&self, serial_exec: &mut TableOpSerialExecutor)
-> bool {
+ pub fn should_flush_table(&self, in_flush: bool) -> bool {
// Fallback to usize::MAX if Failed to convert arena_block_size into
// usize (overflow)
let max_write_buffer_size = self
@@ -602,7 +600,6 @@ impl TableData {
let mutable_usage = self.current_version.mutable_memory_usage();
let total_usage = self.current_version.total_memory_usage();
- let in_flush = serial_exec.flush_scheduler().is_in_flush();
// Inspired by
https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
if mutable_usage > mutable_limit && !in_flush {
info!(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]