This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 03b1df93 feat: manifest support delete (#1610)
03b1df93 is described below
commit 03b1df935b1df0aafbc40c1909805d8ca64e4063
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Dec 18 10:43:20 2024 +0800
feat: manifest support delete (#1610)
## Rationale
When compact finished, we need to delete the old input sst and expired
sst.
## Detailed Changes
- The delta file use `ManifestUpdate` struct.
- Refactor compact scheduler, to make it more modular.
## Test Plan
CI
---
.github/workflows/ci.yml | 4 +-
Makefile | 3 +-
.../src/compaction/{runner.rs => executor.rs} | 159 +++++--
src/metric_engine/src/compaction/mod.rs | 13 +-
src/metric_engine/src/compaction/picker.rs | 56 ++-
src/metric_engine/src/compaction/scheduler.rs | 109 ++---
src/metric_engine/src/lib.rs | 1 +
src/metric_engine/src/manifest.rs | 491 +++++++--------------
src/metric_engine/src/manifest/encoding.rs | 68 +++
src/metric_engine/src/sst.rs | 4 +
src/metric_engine/src/storage.rs | 171 +++----
src/metric_engine/src/{lib.rs => util.rs} | 22 +-
src/pb_types/protos/sst.proto | 8 +-
13 files changed, 564 insertions(+), 545 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c88221af..73775bc6 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -67,6 +67,7 @@ jobs:
- name: Install check binaries
run: |
cargo install --git https://github.com/DevinR528/cargo-sort --rev
55ec890 --locked
+ - uses: Swatinem/rust-cache@v2
- name: Run Style Check
run: |
make fmt sort clippy
@@ -80,8 +81,6 @@ jobs:
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- with:
- submodules: true
- name: Release Disk Quota
run: |
sudo make ensure-disk-quota
@@ -89,6 +88,7 @@ jobs:
run: |
sudo apt update
sudo apt install --yes protobuf-compiler
+ - uses: Swatinem/rust-cache@v2
- name: Run Unit Tests
run: |
make test
diff --git a/Makefile b/Makefile
index b39aeb18..6b4f287f 100644
--- a/Makefile
+++ b/Makefile
@@ -47,8 +47,7 @@ udeps:
cd $(DIR); cargo udeps --all-targets --all-features --workspace
clippy:
- cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D
warnings -D clippy::dbg-macro \
- -A dead_code -A unused_variables -A clippy::unreachable -A
clippy::too_many_arguments # Remove these once we have a clean build
+ cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D
warnings -D clippy::dbg-macro -A clippy::too-many-arguments
ensure-disk-quota:
bash ./scripts/free-disk-space.sh
diff --git a/src/metric_engine/src/compaction/runner.rs
b/src/metric_engine/src/compaction/executor.rs
similarity index 55%
rename from src/metric_engine/src/compaction/runner.rs
rename to src/metric_engine/src/compaction/executor.rs
index ce6f170e..9f41ec82 100644
--- a/src/metric_engine/src/compaction/runner.rs
+++ b/src/metric_engine/src/compaction/executor.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc,
+};
use anyhow::Context;
use arrow::array::{RecordBatch, UInt64Array};
@@ -31,45 +34,59 @@ use tracing::error;
use crate::{
compaction::Task,
- manifest::ManifestRef,
+ ensure,
+ manifest::{ManifestRef, ManifestUpdate},
read::ParquetReader,
- sst::{allocate_id, FileMeta, SstPathGenerator},
- types::{ObjectStoreRef, StorageSchema},
+ sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
+ types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};
#[derive(Clone)]
-pub struct Runner {
+pub struct Executor {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
+ inused_memory: AtomicU64,
+ mem_limit: u64,
}
-impl Runner {
+impl Executor {
pub fn new(
+ runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
+ mem_limit: u64,
) -> Self {
- Self {
+ let inner = Inner {
+ runtime,
store,
schema,
manifest,
sst_path_gen,
parquet_reader,
write_props,
+ mem_limit,
+ inused_memory: AtomicU64::new(0),
+ };
+ Self {
+ inner: Arc::new(inner),
}
}
- // TODO: Merge input sst files into one new sst file
- // and delete the expired sst files
- pub async fn do_compaction(&self, task: Task) -> Result<()> {
+ fn pre_check(&self, task: &Task) -> Result<()> {
assert!(!task.inputs.is_empty());
for f in &task.inputs {
assert!(f.is_compaction());
@@ -77,24 +94,77 @@ impl Runner {
for f in &task.expireds {
assert!(f.is_compaction());
}
+
+ let task_size = task.input_size();
+ let inused = self.inner.inused_memory.load(Ordering::Relaxed);
+ let mem_limit = self.inner.mem_limit;
+ ensure!(
+ inused + task_size > mem_limit,
+ "Compaction memory usage too high, inused:{inused},
task_size:{task_size}, limit:{mem_limit}"
+ );
+
+ self.inner
+ .inused_memory
+ .fetch_add(task.input_size(), Ordering::Relaxed);
+ Ok(())
+ }
+
+ pub fn on_success(&self, task: &Task) {
+ let task_size = task.input_size();
+ self.inner
+ .inused_memory
+ .fetch_add(task_size, Ordering::Relaxed);
+ }
+
+ pub fn on_failure(&self, task: &Task) {
+ let task_size = task.input_size();
+ self.inner
+ .inused_memory
+ .fetch_sub(task_size, Ordering::Relaxed);
+
+ // When task execution fails, unmark sst so they can be
+ // reschduled.
+ for sst in &task.inputs {
+ sst.unmark_compaction();
+ }
+ for sst in &task.expireds {
+ sst.unmark_compaction();
+ }
+ }
+
+ pub fn submit(&self, task: Task) {
+ let runnable = Runnable {
+ executor: self.clone(),
+ task,
+ };
+ runnable.run()
+ }
+
+ // TODO: Merge input sst files into one new sst file
+ // and delete the expired sst files
+ pub async fn do_compaction(&self, task: &Task) -> Result<()> {
+ self.pre_check(task)?;
+
let mut time_range = task.inputs[0].meta().time_range.clone();
for f in &task.inputs[1..] {
time_range.merge(&f.meta().time_range);
}
- let plan = self
- .parquet_reader
- .build_df_plan(task.inputs.clone(), None, Vec::new())?;
+ let plan =
+ self.inner
+ .parquet_reader
+ .build_df_plan(task.inputs.clone(), None, Vec::new())?;
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;
let file_id = allocate_id();
- let file_path = self.sst_path_gen.generate(file_id);
+ let file_path = self.inner.sst_path_gen.generate(file_id);
let file_path = Path::from(file_path);
- let object_store_writer = ParquetObjectWriter::new(self.store.clone(),
file_path.clone());
+ let object_store_writer =
+ ParquetObjectWriter::new(self.inner.store.clone(),
file_path.clone());
let mut writer = AsyncArrowWriter::try_new(
object_store_writer,
- self.schema.arrow_schema.clone(),
- Some(self.write_props.clone()),
+ self.inner.schema.arrow_schema.clone(),
+ Some(self.inner.write_props.clone()),
)
.context("create arrow writer")?;
let mut num_rows = 0;
@@ -107,7 +177,7 @@ impl Runner {
// Since file_id in increasing order, we can use it as
sequence.
let seq_column = Arc::new(UInt64Array::from(vec![file_id;
batch.num_rows()]));
new_cols.push(seq_column);
- RecordBatch::try_new(self.schema.arrow_schema.clone(),
new_cols)
+ RecordBatch::try_new(self.inner.schema.arrow_schema.clone(),
new_cols)
.context("construct record batch with seq column")?
};
@@ -115,6 +185,7 @@ impl Runner {
}
writer.close().await.context("close writer")?;
let object_meta = self
+ .inner
.store
.head(&file_path)
.await
@@ -126,28 +197,37 @@ impl Runner {
time_range: time_range.clone(),
};
// First add new sst to manifest, then delete expired/old sst
- self.manifest.add_file(file_id, file_meta).await?;
- self.manifest
- .add_tombstone_files(task.expireds.clone())
- .await?;
- self.manifest
- .add_tombstone_files(task.inputs.clone())
+ let to_adds = vec![SstFile::new(file_id, file_meta)];
+ let to_deletes = task
+ .expireds
+ .iter()
+ .map(|f| f.id())
+ .chain(task.inputs.iter().map(|f| f.id()))
+ .collect();
+ self.inner
+ .manifest
+ .update(ManifestUpdate::new(to_adds, to_deletes))
.await?;
+ // From now on, no error should be returned!
+ // Because we have already updated manifest.
+
let (_, results) = TokioScope::scope_and_block(|scope| {
- for file in task.expireds {
- let path = Path::from(self.sst_path_gen.generate(file.id()));
+ for file in &task.expireds {
+ let path =
Path::from(self.inner.sst_path_gen.generate(file.id()));
scope.spawn(async move {
- self.store
+ self.inner
+ .store
.delete(&path)
.await
.with_context(|| format!("failed to delete file,
path:{path}"))
});
}
- for file in task.inputs {
- let path = Path::from(self.sst_path_gen.generate(file.id()));
+ for file in &task.inputs {
+ let path =
Path::from(self.inner.sst_path_gen.generate(file.id()));
scope.spawn(async move {
- self.store
+ self.inner
+ .store
.delete(&path)
.await
.with_context(|| format!("failed to delete file,
path:{path}"))
@@ -170,3 +250,22 @@ impl Runner {
Ok(())
}
}
+
+pub struct Runnable {
+ executor: Executor,
+ task: Task,
+}
+
+impl Runnable {
+ fn run(self) {
+ let rt = self.executor.inner.runtime.clone();
+ rt.spawn(async move {
+ if let Err(e) = self.executor.do_compaction(&self.task).await {
+ error!("Do compaction failed, err:{e}");
+ self.executor.on_failure(&self.task);
+ } else {
+ self.executor.on_success(&self.task);
+ }
+ });
+ }
+}
diff --git a/src/metric_engine/src/compaction/mod.rs
b/src/metric_engine/src/compaction/mod.rs
index 62beda0a..38a37ebe 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/compaction/mod.rs
@@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.
+mod executor;
mod picker;
-mod runner;
mod scheduler;
pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
use crate::sst::SstFile;
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Input {
- files: Vec<SstFile>,
-}
-
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Task {
pub inputs: Vec<SstFile>,
pub expireds: Vec<SstFile>,
}
+
+impl Task {
+ pub fn input_size(&self) -> u64 {
+ self.inputs.iter().map(|f| f.size() as u64).sum()
+ }
+}
diff --git a/src/metric_engine/src/compaction/picker.rs
b/src/metric_engine/src/compaction/picker.rs
index 913ad317..9107e3ba 100644
--- a/src/metric_engine/src/compaction/picker.rs
+++ b/src/metric_engine/src/compaction/picker.rs
@@ -19,19 +19,56 @@ use std::{collections::BTreeMap, time::Duration};
use tracing::debug;
-use super::SchedulerConfig;
-use crate::{compaction::Task, sst::SstFile, types::Timestamp};
+use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile,
types::Timestamp, util::now};
+
+pub struct Picker {
+ manifest: ManifestRef,
+ ttl: Option<Duration>,
+ strategy: TimeWindowCompactionStrategy,
+}
+
+impl Picker {
+ pub fn new(
+ manifest: ManifestRef,
+ ttl: Option<Duration>,
+ segment_duration: Duration,
+ new_sst_max_size: u64,
+ input_sst_max_num: usize,
+ ) -> Self {
+ Self {
+ manifest,
+ ttl,
+ strategy: TimeWindowCompactionStrategy::new(
+ segment_duration,
+ new_sst_max_size,
+ input_sst_max_num,
+ ),
+ }
+ }
+
+ pub async fn pick_candidate(&self) -> Option<Task> {
+ let ssts = self.manifest.all_ssts().await;
+ let expire_time = self.ttl.map(|ttl| (now() - ttl.as_micros() as
i64).into());
+ self.strategy.pick_candidate(ssts, expire_time)
+ }
+}
pub struct TimeWindowCompactionStrategy {
segment_duration: Duration,
- config: SchedulerConfig,
+ new_sst_max_size: u64,
+ input_sst_max_num: usize,
}
impl TimeWindowCompactionStrategy {
- pub fn new(segment_duration: Duration, config: SchedulerConfig) -> Self {
+ pub fn new(
+ segment_duration: Duration,
+ new_sst_max_size: u64,
+ input_sst_max_num: usize,
+ ) -> Self {
Self {
segment_duration,
- config,
+ new_sst_max_size,
+ input_sst_max_num,
}
}
@@ -120,12 +157,12 @@ impl TimeWindowCompactionStrategy {
files.sort_unstable_by_key(SstFile::size);
let mut input_size = 0;
- let memory_limit = self.config.memory_limit;
- let compaction_files_limit = self.config.compaction_files_limit;
+ // Suppose the comaction will reduce the size of files by 10%.
+ let memory_limit = (self.new_sst_max_size as f64 * 1.1) as u64;
let compaction_files = files
.into_iter()
- .take(compaction_files_limit)
+ .take(self.input_sst_max_num)
.take_while(|f| {
input_size += f.size() as u64;
input_size <= memory_limit
@@ -154,8 +191,7 @@ mod tests {
#[test]
fn test_pick_candidate() {
let segment_duration = Duration::from_millis(20);
- let config = SchedulerConfig::default();
- let strategy = TimeWindowCompactionStrategy::new(segment_duration,
config);
+ let strategy = TimeWindowCompactionStrategy::new(segment_duration,
9999, 10);
let ssts = (0_i64..5_i64)
.map(|i| {
diff --git a/src/metric_engine/src/compaction/scheduler.rs
b/src/metric_engine/src/compaction/scheduler.rs
index 5e2c407a..4832bf96 100644
--- a/src/metric_engine/src/compaction/scheduler.rs
+++ b/src/metric_engine/src/compaction/scheduler.rs
@@ -15,12 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use std::{
- sync::{atomic::AtomicU64, Arc},
- time::Duration,
-};
+use std::{sync::Arc, time::Duration};
-use anyhow::Context;
use parquet::file::properties::WriterProperties;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
@@ -29,21 +25,20 @@ use tokio::{
};
use tracing::warn;
-use super::runner::Runner;
+use super::{executor::Executor, picker::Picker};
use crate::{
- compaction::{picker::TimeWindowCompactionStrategy, Task},
+ compaction::Task,
manifest::ManifestRef,
read::ParquetReader,
sst::SstPathGenerator,
types::{ObjectStoreRef, RuntimeRef, StorageSchema},
- Result,
};
+#[allow(dead_code)]
pub struct Scheduler {
runtime: RuntimeRef,
task_tx: Sender<Task>,
- inused_memory: AtomicU64,
task_handle: JoinHandle<()>,
picker_handle: JoinHandle<()>,
}
@@ -61,29 +56,35 @@ impl Scheduler {
) -> Self {
let (task_tx, task_rx) =
mpsc::channel(config.max_pending_compaction_tasks);
let task_handle = {
- let rt = runtime.clone();
let store = store.clone();
let manifest = manifest.clone();
let write_props = config.write_props.clone();
+ let executor = Executor::new(
+ runtime.clone(),
+ store,
+ schema,
+ manifest,
+ sst_path_gen,
+ parquet_reader,
+ write_props,
+ config.memory_limit,
+ );
+
runtime.spawn(async move {
- Self::recv_task_loop(
- rt,
- task_rx,
- store,
- schema,
- manifest,
- sst_path_gen,
- parquet_reader,
- config.memory_limit,
- write_props,
- )
- .await;
+ Self::recv_task_loop(task_rx, executor).await;
})
};
let picker_handle = {
let task_tx = task_tx.clone();
runtime.spawn(async move {
- Self::generate_task_loop(manifest, task_tx, segment_duration,
config).await;
+ let picker = Picker::new(
+ manifest,
+ config.ttl,
+ segment_duration,
+ config.new_sst_max_size,
+ config.input_sst_max_num,
+ );
+ Self::generate_task_loop(task_tx, picker,
config.schedule_interval).await;
})
};
@@ -92,60 +93,22 @@ impl Scheduler {
task_tx,
task_handle,
picker_handle,
- inused_memory: AtomicU64::new(0),
}
}
- pub fn try_send(&self, task: Task) -> Result<()> {
- self.task_tx
- .try_send(task)
- .context("failed to send task to scheduler")?;
-
- Ok(())
- }
-
- async fn recv_task_loop(
- rt: RuntimeRef,
- mut task_rx: Receiver<Task>,
- store: ObjectStoreRef,
- schema: StorageSchema,
- manifest: ManifestRef,
- sst_path_gen: Arc<SstPathGenerator>,
- parquet_reader: Arc<ParquetReader>,
- _mem_limit: u64,
- write_props: WriterProperties,
- ) {
- let runner = Runner::new(
- store,
- schema,
- manifest,
- sst_path_gen,
- parquet_reader,
- write_props,
- );
+ async fn recv_task_loop(mut task_rx: Receiver<Task>, executor: Executor) {
while let Some(task) = task_rx.recv().await {
- let runner = runner.clone();
- rt.spawn(async move {
- if let Err(e) = runner.do_compaction(task).await {
- warn!("Do compaction failed, err:{e}");
- }
- });
+ executor.submit(task);
}
}
async fn generate_task_loop(
- manifest: ManifestRef,
task_tx: Sender<Task>,
- segment_duration: Duration,
- config: SchedulerConfig,
+ picker: Picker,
+ schedule_interval: Duration,
) {
- let schedule_interval = config.schedule_interval;
- let compactor = TimeWindowCompactionStrategy::new(segment_duration,
config);
- // TODO: obtain expire time
- let expire_time = None;
loop {
- let ssts = manifest.all_ssts().await;
- if let Some(task) = compactor.pick_candidate(ssts, expire_time) {
+ if let Some(task) = picker.pick_candidate().await {
if let Err(e) = task_tx.try_send(task) {
warn!("Send task failed, err:{e}");
}
@@ -159,20 +122,26 @@ impl Scheduler {
#[derive(Clone)]
pub struct SchedulerConfig {
pub schedule_interval: Duration,
- pub memory_limit: u64,
pub max_pending_compaction_tasks: usize,
- pub compaction_files_limit: usize,
+ // Runner config
+ pub memory_limit: u64,
pub write_props: WriterProperties,
+ // Picker config
+ pub ttl: Option<Duration>,
+ pub new_sst_max_size: u64,
+ pub input_sst_max_num: usize,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
schedule_interval: Duration::from_secs(30),
- memory_limit: bytesize::gb(2_u64),
max_pending_compaction_tasks: 10,
- compaction_files_limit: 10,
+ memory_limit: bytesize::gb(3_u64),
write_props: WriterProperties::default(),
+ ttl: None,
+ new_sst_max_size: bytesize::gb(1_u64),
+ input_sst_max_num: 10,
}
}
}
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs
index 26580cb3..c9e12359 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/lib.rs
@@ -29,5 +29,6 @@ pub mod storage;
#[cfg(test)]
mod test_util;
pub mod types;
+pub(crate) mod util;
pub use error::{AnyhowError, Error, Result};
diff --git a/src/metric_engine/src/manifest.rs
b/src/metric_engine/src/manifest.rs
index 1a50b90e..17b188d6 100644
--- a/src/metric_engine/src/manifest.rs
+++ b/src/metric_engine/src/manifest.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+mod encoding;
use std::{
- collections::HashSet,
io::{Cursor, Write},
sync::{
atomic::{AtomicUsize, Ordering},
@@ -29,16 +29,14 @@ use anyhow::Context;
use async_scoped::TokioScope;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::Bytes;
+pub use encoding::ManifestUpdate;
use futures::{StreamExt, TryStreamExt};
use object_store::{path::Path, PutPayload};
use parquet::data_type::AsBytes;
use prost::Message;
-use tokio::{
- runtime::Runtime,
- sync::{
- mpsc::{self, Receiver, Sender},
- RwLock,
- },
+use tokio::sync::{
+ mpsc::{self, Receiver, Sender},
+ RwLock,
};
use tracing::error;
use uuid::Uuid;
@@ -53,89 +51,31 @@ use crate::{
pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
pub const DELTA_PREFIX: &str = "delta";
-pub const TOMBSTONE_PREFIX: &str = "tombstone";
pub type ManifestRef = Arc<Manifest>;
pub struct Manifest {
delta_dir: Path,
- tombstone_dir: Path,
store: ObjectStoreRef,
merger: Arc<ManifestMerger>,
- payload: RwLock<Payload>,
-}
-
-#[derive(Default)]
-pub struct Payload {
- files: Vec<SstFile>,
-}
-
-impl Payload {
- // TODO: we could sort sst files by name asc, then the dedup will be more
- // efficient
- pub fn dedup_files(&mut self) {
- let mut seen = HashSet::with_capacity(self.files.len());
- self.files.retain(|file| seen.insert(file.id()));
- }
-}
-
-impl TryFrom<Bytes> for Payload {
- type Error = Error;
-
- fn try_from(value: Bytes) -> Result<Self> {
- if value.is_empty() {
- Ok(Self::default())
- } else {
- let snapshot = Snapshot::try_from(value)?;
- let files = snapshot.to_sstfiles()?;
- Ok(Self { files })
- }
- }
-}
-
-impl TryFrom<pb_types::Manifest> for Payload {
- type Error = Error;
-
- fn try_from(value: pb_types::Manifest) -> Result<Self> {
- let files = value
- .files
- .into_iter()
- .map(SstFile::try_from)
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Self { files })
- }
-}
-
-impl From<Payload> for pb_types::Manifest {
- fn from(value: Payload) -> Self {
- pb_types::Manifest {
- files: value
- .files
- .into_iter()
- .map(pb_types::SstFile::from)
- .collect(),
- }
- }
+ ssts: RwLock<Vec<SstFile>>,
}
impl Manifest {
pub async fn try_new(
root_dir: String,
store: ObjectStoreRef,
- runtime: Arc<Runtime>,
+ runtime: RuntimeRef,
merge_options: ManifestMergeOptions,
) -> Result<Self> {
let snapshot_path =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
- let tombstone_dir =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{TOMBSTONE_PREFIX}"));
let merger = ManifestMerger::try_new(
snapshot_path.clone(),
delta_dir.clone(),
store.clone(),
- runtime.clone(),
merge_options,
)
.await?;
@@ -148,42 +88,46 @@ impl Manifest {
});
}
- let bytes = read_object(&store, &snapshot_path).await?;
- // TODO: add upgrade logic
- let payload = bytes.try_into()?;
+ let snapshot = read_snapshot(&store, &snapshot_path).await?;
+ let ssts = snapshot.into_ssts();
Ok(Self {
delta_dir,
- tombstone_dir,
store,
merger,
- payload: RwLock::new(payload),
+ ssts: RwLock::new(ssts),
})
}
pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
- self.merger.maybe_schedule_merge().await?;
-
- let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir));
- let new_sst = SstFile::new(id, meta);
+ let update = ManifestUpdate::new(vec![SstFile::new(id, meta)],
Vec::new());
+ self.update(update).await
+ }
- let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
- let mut buf: Vec<u8> =
Vec::with_capacity(new_sst_payload.encoded_len());
- new_sst_payload
+ pub async fn update(&self, update: ManifestUpdate) -> Result<()> {
+ self.merger.maybe_schedule_merge().await?;
+ let path = Path::from(format!("{}/{}", self.delta_dir,
Uuid::new_v4()));
+ let pb_update = pb_types::ManifestUpdate::from(update.clone());
+ let mut buf: Vec<u8> = Vec::with_capacity(pb_update.encoded_len());
+ pb_update
.encode(&mut buf)
- .context("failed to encode manifest file")?;
- let put_payload = PutPayload::from_bytes(Bytes::from(buf));
+ .context("failed to encode manifest update")?;
// 1. Persist the delta manifest
self.store
- .put(&new_sst_path, put_payload)
+ .put(&path, PutPayload::from_bytes(Bytes::from(buf)))
.await
- .with_context(|| format!("Failed to write delta manifest,
path:{}", new_sst_path))?;
+ .with_context(|| format!("Failed to write delta manifest,
path:{}", path))?;
// 2. Update cached payload
{
- let mut payload = self.payload.write().await;
- payload.files.push(new_sst);
+ let mut ssts = self.ssts.write().await;
+ for file in update.to_adds {
+ ssts.push(file);
+ }
+ // TODO: sort files in payload, so we can delete files more
+ // efficiently.
+ ssts.retain(|file| !update.to_deletes.contains(&file.id()));
}
// 3. Update delta files num
@@ -192,27 +136,16 @@ impl Manifest {
Ok(())
}
- // TODO: recovery tombstone files when startup
- pub async fn add_tombstone_files<I>(&self, files: I) -> Result<()>
- where
- I: IntoIterator<Item = SstFile>,
- {
- let path = Path::from(format!("{}/{}", self.tombstone_dir,
Uuid::new_v4()));
- todo!()
- }
-
// TODO: avoid clone
pub async fn all_ssts(&self) -> Vec<SstFile> {
- let payload = self.payload.read().await;
- payload.files.clone()
+ let ssts = self.ssts.read().await;
+ ssts.clone()
}
pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
- let payload = self.payload.read().await;
+ let ssts = self.ssts.read().await;
- payload
- .files
- .iter()
+ ssts.iter()
.filter(move |f| f.meta().time_range.overlaps(time_range))
.cloned()
.collect()
@@ -231,6 +164,7 @@ impl Manifest {
/// - The length field (u64) represents the total length of the subsequent
/// records and serves as a straightforward method for verifying their
/// integrity. (length = record_length * record_count)
+#[derive(Debug)]
struct SnapshotHeader {
pub magic: u32,
pub version: u8,
@@ -279,13 +213,10 @@ impl SnapshotHeader {
}
}
- pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> {
- ensure!(
- writer.len() >= SnapshotHeader::LENGTH,
- "writer buf is too small for writing the header, length: {}",
- writer.len()
- );
-
+ pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+ where
+ W: Write,
+ {
writer
.write_u32::<LittleEndian>(self.magic)
.context("write shall not fail.")?;
@@ -319,13 +250,10 @@ impl SnapshotRecordV1 {
const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num
rows*/;
pub const VERSION: u8 = 1;
- pub fn write_to(&self, writer: &mut &mut [u8]) -> Result<()> {
- ensure!(
- writer.len() >= SnapshotRecordV1::LENGTH,
- "writer buf is too small for writing the record, length: {}",
- writer.len()
- );
-
+ pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+ where
+ W: Write,
+ {
writer
.write_u64::<LittleEndian>(self.id)
.context("write shall not fail.")?;
@@ -399,7 +327,7 @@ impl From<SnapshotRecordV1> for SstFile {
struct Snapshot {
header: SnapshotHeader,
- inner: Bytes,
+ records: Vec<SnapshotRecordV1>,
}
impl Default for Snapshot {
@@ -408,7 +336,7 @@ impl Default for Snapshot {
let header = SnapshotHeader::new(0);
Self {
header,
- inner: Bytes::new(),
+ records: Vec::new(),
}
}
}
@@ -421,82 +349,57 @@ impl TryFrom<Bytes> for Snapshot {
return Ok(Snapshot::default());
}
let header = SnapshotHeader::try_from(bytes.as_bytes())?;
- let header_length = header.length as usize;
+ let record_total_length = header.length as usize;
ensure!(
- header_length > 0
- && header_length % SnapshotRecordV1::LENGTH == 0
- && header_length + SnapshotHeader::LENGTH == bytes.len(),
- "create snapshot from bytes failed, invalid bytes, header length =
{}, total length: {}",
- header_length,
- bytes.len()
+ record_total_length > 0
+ && record_total_length % SnapshotRecordV1::LENGTH == 0
+ && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
+ "create snapshot from bytes failed, header:{header:?},
bytes_length: {}",
+ bytes.len()
);
+ let mut index = SnapshotHeader::LENGTH;
+ let mut records = Vec::with_capacity(record_total_length /
SnapshotRecordV1::LENGTH);
+ while index < bytes.len() {
+ let record =
+ SnapshotRecordV1::try_from(&bytes[index..index +
SnapshotRecordV1::LENGTH])?;
+ records.push(record);
+ index += SnapshotRecordV1::LENGTH;
+ }
- Ok(Self {
- header,
- inner: bytes,
- })
+ Ok(Self { header, records })
}
}
impl Snapshot {
- pub fn to_sstfiles(&self) -> Result<Vec<SstFile>> {
+ pub fn into_ssts(self) -> Vec<SstFile> {
if self.header.length == 0 {
- Ok(Vec::new())
+ Vec::new()
} else {
- let buf = self.inner.as_bytes();
- let mut result: Vec<SstFile> =
- Vec::with_capacity(self.header.length as usize /
SnapshotRecordV1::LENGTH);
- let mut index = SnapshotHeader::LENGTH;
- while index < buf.len() {
- let record =
- SnapshotRecordV1::try_from(&buf[index..index +
SnapshotRecordV1::LENGTH])?;
- index += SnapshotRecordV1::LENGTH;
- result.push(record.into());
- }
-
- Ok(result)
+ self.records.into_iter().map(|r| r.into()).collect()
}
}
- pub fn dedup_sstfiles(&self, sstfiles: &mut Vec<SstFile>) -> Result<()> {
- let buf = self.inner.as_bytes();
- let mut ids = HashSet::new();
- let mut index = SnapshotHeader::LENGTH;
- while index < buf.len() {
- let record = SnapshotRecordV1::try_from(&buf[index..index +
SnapshotRecordV1::LENGTH])?;
- index += SnapshotRecordV1::LENGTH;
- ids.insert(record.id());
- }
- sstfiles.retain(|item| !ids.contains(&item.id()));
+ // TODO: Ensure no files duplicated
+ // https://github.com/apache/horaedb/issues/1608
+ pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> {
+ self.records
+ .extend(update.to_adds.into_iter().map(SnapshotRecordV1::from));
+ self.records
+ .retain(|record| !update.to_deletes.contains(&record.id));
+ self.header.length = (self.records.len() * SnapshotRecordV1::LENGTH)
as u64;
Ok(())
}
- pub fn merge_sstfiles(&mut self, sstfiles: Vec<SstFile>) {
- // update header
- self.header.length += (sstfiles.len() * SnapshotRecordV1::LENGTH) as
u64;
- // final snapshot
- let mut snapshot = vec![0u8; SnapshotHeader::LENGTH +
self.header.length as usize];
- let mut writer = snapshot.as_mut_slice();
-
- // write new head
- self.header.write_to(&mut writer).unwrap();
- // write old records
- if !self.inner.is_empty() {
- writer
- .write_all(&self.inner.as_bytes()[SnapshotHeader::LENGTH..])
- .unwrap();
- }
- // write new records
- for sst in sstfiles {
- let record: SnapshotRecordV1 = sst.into();
- record.write_to(&mut writer).unwrap();
- }
- self.inner = Bytes::from(snapshot);
- }
+ pub fn into_bytes(self) -> Result<Bytes> {
+ let buf = Vec::with_capacity(self.header.length as usize +
SnapshotHeader::LENGTH);
+ let mut cursor = Cursor::new(buf);
- pub fn into_bytes(self) -> Bytes {
- self.inner
+ self.header.write_to(&mut cursor)?;
+ for record in self.records {
+ record.write_to(&mut cursor).unwrap();
+ }
+ Ok(Bytes::from(cursor.into_inner()))
}
}
@@ -509,7 +412,6 @@ struct ManifestMerger {
snapshot_path: Path,
delta_dir: Path,
store: ObjectStoreRef,
- runtime: RuntimeRef,
sender: Sender<MergeType>,
receiver: RwLock<Receiver<MergeType>>,
deltas_num: AtomicUsize,
@@ -521,7 +423,6 @@ impl ManifestMerger {
snapshot_path: Path,
delta_dir: Path,
store: ObjectStoreRef,
- runtime: Arc<Runtime>,
merge_options: ManifestMergeOptions,
) -> Result<Arc<Self>> {
let (tx, rx) = mpsc::channel(merge_options.channel_size);
@@ -529,7 +430,6 @@ impl ManifestMerger {
snapshot_path,
delta_dir,
store,
- runtime,
sender: tx,
receiver: RwLock::new(rx),
deltas_num: AtomicUsize::new(0),
@@ -602,17 +502,12 @@ impl ManifestMerger {
}
});
- let mut delta_files = Vec::with_capacity(results.len());
+ let mut snapshot = read_snapshot(&self.store,
&self.snapshot_path).await?;
for res in results {
- let sst_file = res.context("Failed to join read delta files
task")??;
- delta_files.push(sst_file);
+ let manifest_update = res.context("Failed to join read delta files
task")??;
+ snapshot.merge_update(manifest_update)?;
}
- let snapshot_bytes = read_object(&self.store,
&self.snapshot_path).await?;
- let mut snapshot = Snapshot::try_from(snapshot_bytes)?;
- // TODO: no need to dedup every time.
- snapshot.dedup_sstfiles(&mut delta_files)?;
- snapshot.merge_sstfiles(delta_files);
- let snapshot_bytes = snapshot.into_bytes();
+ let snapshot_bytes = snapshot.into_bytes()?;
let put_payload = PutPayload::from_bytes(snapshot_bytes);
// 1. Persist the snapshot
self.store
@@ -646,38 +541,18 @@ impl ManifestMerger {
}
}
-async fn read_object(store: &ObjectStoreRef, path: &Path) -> Result<Bytes> {
- match store.get(path).await {
- Ok(v) => v
- .bytes()
- .await
- .with_context(|| format!("Failed to read manifest snapshot,
path:{path}"))
- .map_err(|e| e.into()),
- Err(err) => {
- if err.to_string().contains("not found") {
- Ok(Bytes::new())
- } else {
- let context = format!("Failed to read file, path:{path}");
- Err(AnyhowError::new(err).context(context).into())
- }
- }
- }
-}
-
-async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result<Payload>
{
+async fn read_snapshot(store: &ObjectStoreRef, path: &Path) ->
Result<Snapshot> {
match store.get(path).await {
Ok(v) => {
let bytes = v
.bytes()
.await
.with_context(|| format!("Failed to read manifest snapshot,
path:{path}"))?;
- let pb_payload = pb_types::Manifest::decode(bytes)
- .with_context(|| format!("Failed to decode manifest snapshot,
path:{path}"))?;
- Payload::try_from(pb_payload)
+ Snapshot::try_from(bytes)
}
Err(err) => {
if err.to_string().contains("not found") {
- Ok(Payload { files: vec![] })
+ Ok(Snapshot::default())
} else {
let context = format!("Failed to read manifest snapshot,
path:{path}");
Err(AnyhowError::new(err).context(context).into())
@@ -686,7 +561,7 @@ async fn read_snapshot(store: &ObjectStoreRef, path: &Path)
-> Result<Payload> {
}
}
-async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) ->
Result<SstFile> {
+async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) ->
Result<ManifestUpdate> {
let bytes = store
.get(sst_path)
.await
@@ -695,12 +570,12 @@ async fn read_delta_file(store: &ObjectStoreRef,
sst_path: &Path) -> Result<SstF
.await
.with_context(|| format!("failed to read delta file,
path:{sst_path}"))?;
- let pb_sst = pb_types::SstFile::decode(bytes)
+ let pb_update = pb_types::ManifestUpdate::decode(bytes)
.with_context(|| format!("failed to decode delta file,
path:{sst_path}"))?;
- let sst = SstFile::try_from(pb_sst)
+ let update = ManifestUpdate::try_from(pb_update)
.with_context(|| format!("failed to convert delta file,
path:{sst_path}"))?;
- Ok(sst)
+ Ok(update)
}
async fn delete_delta_file(store: &ObjectStoreRef, path: &Path) -> Result<()> {
@@ -730,43 +605,30 @@ async fn list_delta_paths(store: &ObjectStoreRef,
delta_dir: &Path) -> Result<Ve
mod tests {
use std::sync::Arc;
+ use itertools::Itertools;
use object_store::local::LocalFileSystem;
use tokio::time::sleep;
use super::*;
- #[tokio::test]
- async fn test_find_manifest() {
+ #[test]
+ fn test_find_manifest() {
let root_dir = temp_dir::TempDir::new().unwrap();
- let runtime = tokio::runtime::Runtime::new().unwrap();
+ let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
+ let rt = runtime.clone();
let store = Arc::new(LocalFileSystem::new());
- let manifest = Manifest::try_new(
- root_dir.path().to_string_lossy().to_string(),
- store,
- Arc::new(runtime),
- ManifestMergeOptions::default(),
- )
- .await
- .unwrap();
-
- for i in 0..20 {
- let time_range = (i..i + 1).into();
- let meta = FileMeta {
- max_sequence: i as u64,
- num_rows: i as u32,
- size: i as u32,
- time_range,
- };
- manifest.add_file(i as u64, meta).await.unwrap();
- }
-
- let find_range = (10..15).into();
- let mut ssts = manifest.find_ssts(&find_range).await;
+ rt.block_on(async move {
+ let manifest = Manifest::try_new(
+ root_dir.path().to_string_lossy().to_string(),
+ store,
+ runtime.clone(),
+ ManifestMergeOptions::default(),
+ )
+ .await
+ .unwrap();
- let mut expected_ssts = (10..15)
- .map(|i| {
- let id = i as u64;
+ for i in 0..20 {
let time_range = (i..i + 1).into();
let meta = FileMeta {
max_sequence: i as u64,
@@ -774,17 +636,34 @@ mod tests {
size: i as u32,
time_range,
};
- SstFile::new(id, meta)
- })
- .collect::<Vec<_>>();
+ manifest.add_file(i as u64, meta).await.unwrap();
+ }
- expected_ssts.sort_by_key(|a| a.id());
- ssts.sort_by_key(|a| a.id());
- assert_eq!(expected_ssts, ssts);
+ let find_range = (10..15).into();
+ let mut ssts = manifest.find_ssts(&find_range).await;
+
+ let mut expected_ssts = (10..15)
+ .map(|i| {
+ let id = i as u64;
+ let time_range = (i..i + 1).into();
+ let meta = FileMeta {
+ max_sequence: i as u64,
+ num_rows: i as u32,
+ size: i as u32,
+ time_range,
+ };
+ SstFile::new(id, meta)
+ })
+ .collect::<Vec<_>>();
+
+ expected_ssts.sort_by_key(|a| a.id());
+ ssts.sort_by_key(|a| a.id());
+ assert_eq!(expected_ssts, ssts);
+ });
}
- #[tokio::test]
- async fn test_merge_manifest() {
+ #[test]
+ fn test_merge_manifest() {
let root_dir = temp_dir::TempDir::new()
.unwrap()
.path()
@@ -792,72 +671,53 @@ mod tests {
.to_string();
let snapshot_path =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir =
Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
- let runtime = tokio::runtime::Builder::new_multi_thread()
- .worker_threads(4)
- .enable_all()
- .build()
+ let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
+ let rt = runtime.clone();
+
+ rt.block_on(async move {
+ let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
+ let manifest = Manifest::try_new(
+ root_dir,
+ store.clone(),
+ runtime.clone(),
+ ManifestMergeOptions {
+ merge_interval_seconds: 1,
+ ..Default::default()
+ },
+ )
+ .await
.unwrap();
- let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
- let manifest = Manifest::try_new(
- root_dir,
- store.clone(),
- Arc::new(runtime),
- ManifestMergeOptions {
- merge_interval_seconds: 1,
- ..Default::default()
- },
- )
- .await
- .unwrap();
-
- // Add manifest files
- for i in 0..20 {
- let time_range = (i..i + 1).into();
- let meta = FileMeta {
- max_sequence: i as u64,
- num_rows: i as u32,
- size: i as u32,
- time_range,
- };
- manifest.add_file(i as u64, meta).await.unwrap();
- }
+ // Add manifest files
+ for i in 0..20 {
+ let time_range = (i..i + 1).into();
+ let meta = FileMeta {
+ max_sequence: i as u64,
+ num_rows: i as u32,
+ size: i as u32,
+ time_range,
+ };
+ manifest.add_file(i as u64, meta).await.unwrap();
+ }
- // Wait for merge manifest to finish
- sleep(Duration::from_secs(2)).await;
-
- let mut mem_ssts = manifest.payload.read().await.files.clone();
- let snapshot = read_object(&store, &snapshot_path).await.unwrap();
- let snapshot_len = snapshot.len();
- let payload: Payload = snapshot.try_into().unwrap();
- let mut ssts = payload.files;
-
- mem_ssts.sort_by_key(|a| a.id());
- ssts.sort_by_key(|a| a.id());
- assert_eq!(mem_ssts, ssts);
-
- let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
- assert!(delta_paths.is_empty());
-
- // Add manifest files again to verify dedup
- for i in 0..20 {
- let time_range = (i..i + 1).into();
- let meta = FileMeta {
- max_sequence: i as u64,
- num_rows: i as u32,
- size: i as u32,
- time_range,
- };
- manifest.add_file(i as u64, meta).await.unwrap();
- }
+ // Wait for merge manifest to finish
+ sleep(Duration::from_secs(2)).await;
- // Wait for merge manifest to finish
- sleep(Duration::from_secs(2)).await;
+ let mut mem_ssts = manifest.ssts.read().await.clone();
+ let snapshot = read_snapshot(&store,
&snapshot_path).await.unwrap();
+ let mut ssts = snapshot
+ .records
+ .into_iter()
+ .map(SstFile::from)
+ .collect_vec();
+
+ mem_ssts.sort_by_key(|a| a.id());
+ ssts.sort_by_key(|a| a.id());
+ assert_eq!(mem_ssts, ssts);
- let snapshot_again = read_object(&store,
&snapshot_path).await.unwrap();
- assert!(snapshot_len == snapshot_again.len()); // dedup took effect.
- let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
- assert!(delta_paths.is_empty());
+ let delta_paths = list_delta_paths(&store,
&delta_dir).await.unwrap();
+ assert!(delta_paths.is_empty());
+ })
}
#[test]
@@ -885,11 +745,6 @@ mod tests {
257, // length
cursor.read_u64::<LittleEndian>().unwrap()
);
-
- let mut vec = [0u8; SnapshotHeader::LENGTH - 1];
- let mut writer = vec.as_mut_slice();
- let result = header.write_to(&mut writer);
- assert!(result.is_err()); // buf not enough
}
#[test]
@@ -931,9 +786,5 @@ mod tests {
100, // num rows
cursor.read_u32::<LittleEndian>().unwrap()
);
- let mut vec = vec![0u8; SnapshotRecordV1::LENGTH - 1];
- let mut writer = vec.as_mut_slice();
- let result = record.write_to(&mut writer);
- assert!(result.is_err()); // buf not enough
}
}
diff --git a/src/metric_engine/src/manifest/encoding.rs
b/src/metric_engine/src/manifest/encoding.rs
new file mode 100644
index 00000000..01deb7a5
--- /dev/null
+++ b/src/metric_engine/src/manifest/encoding.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+ sst::{FileId, SstFile},
+ Error, Result,
+};
+
+#[derive(Clone, Debug)]
+pub struct ManifestUpdate {
+ pub to_adds: Vec<SstFile>,
+ pub to_deletes: Vec<FileId>,
+}
+
+impl ManifestUpdate {
+ pub fn new(to_adds: Vec<SstFile>, to_deletes: Vec<FileId>) -> Self {
+ Self {
+ to_adds,
+ to_deletes,
+ }
+ }
+}
+
+impl TryFrom<pb_types::ManifestUpdate> for ManifestUpdate {
+ type Error = Error;
+
+ fn try_from(value: pb_types::ManifestUpdate) -> Result<Self> {
+ let to_adds = value
+ .to_adds
+ .into_iter()
+ .map(SstFile::try_from)
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Self {
+ to_adds,
+ to_deletes: value.to_deletes,
+ })
+ }
+}
+
+impl From<ManifestUpdate> for pb_types::ManifestUpdate {
+ fn from(value: ManifestUpdate) -> Self {
+ let to_adds = value
+ .to_adds
+ .into_iter()
+ .map(pb_types::SstFile::from)
+ .collect();
+
+ pb_types::ManifestUpdate {
+ to_adds,
+ to_deletes: value.to_deletes,
+ }
+ }
+}
diff --git a/src/metric_engine/src/sst.rs b/src/metric_engine/src/sst.rs
index 1cbfedec..070a62ed 100644
--- a/src/metric_engine/src/sst.rs
+++ b/src/metric_engine/src/sst.rs
@@ -74,6 +74,10 @@ impl SstFile {
self.inner.in_compaction.store(true, Ordering::Relaxed);
}
+ pub fn unmark_compaction(&self) {
+ self.inner.in_compaction.store(false, Ordering::Relaxed);
+ }
+
pub fn is_compaction(&self) -> bool {
self.inner.in_compaction.load(Ordering::Relaxed)
}
diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs
index 37cc904e..f15d67ff 100644
--- a/src/metric_engine/src/storage.rs
+++ b/src/metric_engine/src/storage.rs
@@ -125,6 +125,7 @@ impl StorageRuntimes {
///
/// Compaction will be done by merging segments within a segment, and segment
/// will make it easy to support expiration.
+#[allow(dead_code)]
pub struct CloudObjectStorage {
segment_duration: Duration,
path: String,
@@ -150,7 +151,7 @@ pub struct CloudObjectStorage {
/// ```
/// `root_path` is composed of `path` and `segment_duration`.
impl CloudObjectStorage {
- pub async fn try_new(
+ pub fn try_new(
path: String,
segment_duration: Duration,
store: ObjectStoreRef,
@@ -159,15 +160,16 @@ impl CloudObjectStorage {
storage_opts: StorageOptions,
) -> Result<Self> {
let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
- let manifest = Arc::new(
+ let manifest = runtimes.manifest_compact_runtime.block_on(async {
Manifest::try_new(
path.clone(),
store.clone(),
runtimes.manifest_compact_runtime.clone(),
storage_opts.manifest_merge_opts,
)
- .await?,
- );
+ .await
+ })?;
+ let manifest = Arc::new(manifest);
let schema = {
let value_idxes =
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
ensure!(!value_idxes.is_empty(), "no value column found");
@@ -426,8 +428,8 @@ mod tests {
use super::*;
use crate::{arrow_schema, record_batch, test_util::check_stream,
types::Timestamp};
- #[test(tokio::test)]
- async fn test_storage_write_and_scan() {
+ #[test(test)]
+ fn test_storage_write_and_scan() {
let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value",
Int64));
let root_dir = temp_dir::TempDir::new().unwrap();
let store = Arc::new(LocalFileSystem::new());
@@ -439,67 +441,68 @@ mod tests {
2, // num_primary_keys
StorageOptions::default(),
)
- .await
- .unwrap();
-
- let batch = record_batch!(
- ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
- ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
- ("value", Int64, vec![2, 7, 4, 6, 1])
- )
.unwrap();
- storage
- .write(WriteRequest {
- batch,
- time_range: (1..10).into(),
- enable_check: true,
- })
- .await
- .unwrap();
- let batch = record_batch!(
- ("pk1", UInt8, vec![11, 11, 9, 10]),
- ("pk2", UInt8, vec![100, 99, 1, 2]),
- ("value", Int64, vec![22, 77, 44, 66])
- )
- .unwrap();
- storage
- .write(WriteRequest {
- batch,
- time_range: (10..20).into(),
- enable_check: true,
- })
- .await
+ storage.runtimes.sst_compact_runtime.block_on(async {
+ let batch = record_batch!(
+ ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
+ ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
+ ("value", Int64, vec![2, 7, 4, 6, 1])
+ )
.unwrap();
+ storage
+ .write(WriteRequest {
+ batch,
+ time_range: (1..10).into(),
+ enable_check: true,
+ })
+ .await
+ .unwrap();
- let result_stream = storage
- .scan(ScanRequest {
- range: TimeRange::new(Timestamp(0), Timestamp::MAX),
- predicate: vec![],
- projections: None,
- })
- .await
- .unwrap();
- let expected_batch = [
- record_batch!(
- ("pk1", UInt8, vec![5, 9, 10, 11]),
- ("pk2", UInt8, vec![3, 1, 2, 99]),
- ("value", Int64, vec![1, 44, 66, 77])
- )
- .unwrap(),
- record_batch!(
- ("pk1", UInt8, vec![11]),
- ("pk2", UInt8, vec![100]),
- ("value", Int64, vec![22])
+ let batch = record_batch!(
+ ("pk1", UInt8, vec![11, 11, 9, 10]),
+ ("pk2", UInt8, vec![100, 99, 1, 2]),
+ ("value", Int64, vec![22, 77, 44, 66])
)
- .unwrap(),
- ];
+ .unwrap();
+ storage
+ .write(WriteRequest {
+ batch,
+ time_range: (10..20).into(),
+ enable_check: true,
+ })
+ .await
+ .unwrap();
- check_stream(result_stream, expected_batch).await;
+ let result_stream = storage
+ .scan(ScanRequest {
+ range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+ predicate: vec![],
+ projections: None,
+ })
+ .await
+ .unwrap();
+ let expected_batch = [
+ record_batch!(
+ ("pk1", UInt8, vec![5, 9, 10, 11]),
+ ("pk2", UInt8, vec![3, 1, 2, 99]),
+ ("value", Int64, vec![1, 44, 66, 77])
+ )
+ .unwrap(),
+ record_batch!(
+ ("pk1", UInt8, vec![11]),
+ ("pk2", UInt8, vec![100]),
+ ("value", Int64, vec![22])
+ )
+ .unwrap(),
+ ];
+
+ check_stream(result_stream, expected_batch).await;
+ });
}
- #[tokio::test]
- async fn test_storage_sort_batch() {
+ #[test]
+ fn test_storage_sort_batch() {
let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8),
("c", UInt8));
let root_dir = temp_dir::TempDir::new().unwrap();
let store = Arc::new(LocalFileSystem::new());
@@ -511,32 +514,32 @@ mod tests {
1,
StorageOptions::default(),
)
- .await
- .unwrap();
-
- let batch = record_batch!(
- ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
- ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
- ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]),
- ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8])
- )
.unwrap();
+ storage.runtimes.sst_compact_runtime.block_on(async {
+ let batch = record_batch!(
+ ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
+ ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
+ ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]),
+ ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8])
+ )
+ .unwrap();
- let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
- let expected_bacth = record_batch!(
- ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]),
- ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]),
- ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]),
- ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1])
- )
- .unwrap();
- let mut offset = 0;
- while let Some(sorted_batch) = sorted_batches.next().await {
- let sorted_batch = sorted_batch.unwrap();
- let length = sorted_batch.num_rows();
- let batch = expected_bacth.slice(offset, length);
- assert_eq!(sorted_batch, batch);
- offset += length;
- }
+ let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
+ let expected_bacth = record_batch!(
+ ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]),
+ ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]),
+ ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]),
+ ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1])
+ )
+ .unwrap();
+ let mut offset = 0;
+ while let Some(sorted_batch) = sorted_batches.next().await {
+ let sorted_batch = sorted_batch.unwrap();
+ let length = sorted_batch.num_rows();
+ let batch = expected_bacth.slice(offset, length);
+ assert_eq!(sorted_batch, batch);
+ offset += length;
+ }
+ });
}
}
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/util.rs
similarity index 75%
copy from src/metric_engine/src/lib.rs
copy to src/metric_engine/src/util.rs
index 26580cb3..f48e8834 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/util.rs
@@ -15,19 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-//! Storage Engine for metrics.
+use std::time::{SystemTime, UNIX_EPOCH};
-#![feature(duration_constructors)]
-mod compaction;
-pub mod error;
-mod macros;
-mod manifest;
-pub mod operator;
-mod read;
-mod sst;
-pub mod storage;
-#[cfg(test)]
-mod test_util;
-pub mod types;
-
-pub use error::{AnyhowError, Error, Result};
+/// Current time in milliseconds.
+pub fn now() -> i64 {
+ let now = SystemTime::now();
+ let duration = now.duration_since(UNIX_EPOCH).unwrap();
+ duration.as_millis() as i64
+}
diff --git a/src/pb_types/protos/sst.proto b/src/pb_types/protos/sst.proto
index 5312ffa7..3c5d9040 100644
--- a/src/pb_types/protos/sst.proto
+++ b/src/pb_types/protos/sst.proto
@@ -41,11 +41,7 @@ message SstFile {
SstMeta meta = 2;
}
-message Manifest {
- repeated SstFile files = 1;
-}
-
-message MetaUpdate {
+message ManifestUpdate {
repeated SstFile to_adds = 1;
- repeated uint64 to_removes = 2;
+ repeated uint64 to_deletes = 2;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]