jiacai2050 commented on code in PR #1610:
URL: https://github.com/apache/horaedb/pull/1610#discussion_r1889542505
##########
src/metric_engine/src/compaction/executor.rs:
##########
@@ -31,70 +34,137 @@ use tracing::error;
use crate::{
compaction::Task,
- manifest::ManifestRef,
+ ensure,
+ manifest::{ManifestRef, ManifestUpdate},
read::ParquetReader,
- sst::{allocate_id, FileMeta, SstPathGenerator},
- types::{ObjectStoreRef, StorageSchema},
+ sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
+ types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};
#[derive(Clone)]
-pub struct Runner {
+pub struct Executor {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
+ inused_memory: AtomicU64,
+ mem_limit: u64,
}
-impl Runner {
+impl Executor {
pub fn new(
+ runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
+ mem_limit: u64,
) -> Self {
- Self {
+ let inner = Inner {
+ runtime,
store,
schema,
manifest,
sst_path_gen,
parquet_reader,
write_props,
+ mem_limit,
+ inused_memory: AtomicU64::new(0),
+ };
+ Self {
+ inner: Arc::new(inner),
}
}
- // TODO: Merge input sst files into one new sst file
- // and delete the expired sst files
- pub async fn do_compaction(&self, task: Task) -> Result<()> {
+ fn pre_check(&self, task: &Task) -> Result<()> {
assert!(!task.inputs.is_empty());
for f in &task.inputs {
assert!(f.is_compaction());
}
for f in &task.expireds {
assert!(f.is_compaction());
}
+
+ let task_size = task.input_size();
+ let inused = self.inner.inused_memory.load(Ordering::Relaxed);
+ let mem_limit = self.inner.mem_limit;
+ ensure!(
+ inused + task_size > mem_limit,
+ "Compaction memory usage too high, inused:{inused},
task_size:{task_size}, limit:{mem_limit}"
+ );
+
+ self.inner
+ .inused_memory
+ .fetch_add(task.input_size(), Ordering::Relaxed);
+ Ok(())
+ }
+
+ pub fn on_success(&self, task: &Task) {
+ let task_size = task.input_size();
+ self.inner
+ .inused_memory
+ .fetch_add(task_size, Ordering::Relaxed);
Review Comment:
I will fix it in followup PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]