This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 941ea072 feat: add metric manager (#1621)
941ea072 is described below
commit 941ea0722c76be2a6d8e46ac1487e22085855a42
Author: Jiacai Liu <[email protected]>
AuthorDate: Tue Jan 14 11:52:47 2025 +0800
feat: add metric manager (#1621)
---
Cargo.lock | 68 +++--
Cargo.toml | 4 +-
Makefile | 3 +-
src/benchmarks/Cargo.toml | 2 +-
src/benchmarks/src/encoding_bench.rs | 2 +-
src/common/Cargo.toml | 2 +
src/{metric_engine => common}/src/error.rs | 0
src/common/src/lib.rs | 2 +
src/metric_engine/Cargo.toml | 17 +-
src/metric_engine/README.md | 40 +++
src/metric_engine/src/{compaction => data}/mod.rs | 32 ++-
src/metric_engine/src/{compaction => index}/mod.rs | 32 ++-
src/metric_engine/src/lib.rs | 22 +-
.../src/{compaction => metric}/mod.rs | 32 ++-
src/metric_engine/src/types.rs | 304 ++-------------------
src/server/Cargo.toml | 2 +-
src/server/src/config.rs | 2 +-
src/server/src/main.rs | 2 +-
src/{metric_engine => storage}/Cargo.toml | 5 +-
.../src/compaction/executor.rs | 0
.../src/compaction/mod.rs | 0
.../src/compaction/picker.rs | 0
.../src/compaction/scheduler.rs | 0
src/{metric_engine => storage}/src/config.rs | 0
src/{metric_engine => storage}/src/lib.rs | 6 +-
src/{metric_engine => storage}/src/macros.rs | 0
.../src/manifest/encoding.rs | 0
src/{metric_engine => storage}/src/manifest/mod.rs | 0
src/{metric_engine => storage}/src/operator.rs | 0
src/{metric_engine => storage}/src/read.rs | 0
src/{metric_engine => storage}/src/sst.rs | 0
src/{metric_engine => storage}/src/storage.rs | 0
src/{metric_engine => storage}/src/test_util.rs | 0
src/{metric_engine => storage}/src/types.rs | 0
34 files changed, 177 insertions(+), 402 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 0a559050..98c71ab0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -632,10 +632,10 @@ dependencies = [
"bytes",
"common",
"criterion",
- "metric_engine",
"pb_types",
"prost",
"serde",
+ "storage",
"toml",
"tracing",
"tracing-subscriber",
@@ -734,12 +734,6 @@ version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
-[[package]]
-name = "bytesize"
-version = "1.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
-
[[package]]
name = "bytestring"
version = "1.4.0"
@@ -914,7 +908,9 @@ dependencies = [
name = "common"
version = "2.2.0-alpha"
dependencies = [
+ "anyhow",
"serde",
+ "thiserror",
"toml",
]
@@ -2158,28 +2154,13 @@ name = "metric_engine"
version = "2.2.0-alpha"
dependencies = [
"anyhow",
- "arrow",
- "async-scoped",
- "async-trait",
- "byteorder",
- "bytes",
- "bytesize",
"common",
- "datafusion",
- "futures",
- "itertools 0.3.25",
- "lazy_static",
- "object_store",
- "parquet",
- "pb_types",
- "prost",
- "serde",
+ "storage",
"temp-dir",
"test-log",
"thiserror",
"tokio",
"tracing",
- "uuid",
]
[[package]]
@@ -2918,10 +2899,10 @@ dependencies = [
"clap",
"common",
"futures",
- "metric_engine",
"object_store",
"rand",
"serde",
+ "storage",
"tokio",
"toml",
"tracing",
@@ -3059,6 +3040,32 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
+[[package]]
+name = "storage"
+version = "2.2.0-alpha"
+dependencies = [
+ "anyhow",
+ "arrow",
+ "async-scoped",
+ "async-trait",
+ "byteorder",
+ "bytes",
+ "common",
+ "datafusion",
+ "futures",
+ "itertools 0.3.25",
+ "lazy_static",
+ "object_store",
+ "parquet",
+ "pb_types",
+ "prost",
+ "serde",
+ "temp-dir",
+ "test-log",
+ "tokio",
+ "tracing",
+]
+
[[package]]
name = "strsim"
version = "0.11.1"
@@ -3465,19 +3472,6 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
dependencies = [
"getrandom",
- "rand",
- "uuid-macro-internal",
-]
-
-[[package]]
-name = "uuid-macro-internal"
-version = "1.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b91f57fe13a38d0ce9e28a03463d8d3c2468ed03d75375110ec71d93b449a08"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 9d7dc825..d5d24cf8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,12 +31,14 @@ members = [
"src/common",
"src/metric_engine",
"src/pb_types",
- "src/server"
+ "src/server",
+ "src/storage"
]
[workspace.dependencies]
anyhow = { version = "1.0" }
metric_engine = { path = "src/metric_engine" }
+horaedb_storage = { package = "storage", path = "src/storage" }
common = { path = "src/common" }
thiserror = "1"
bytes = "1"
diff --git a/Makefile b/Makefile
index 6b4f287f..ce8beee5 100644
--- a/Makefile
+++ b/Makefile
@@ -47,7 +47,8 @@ udeps:
cd $(DIR); cargo udeps --all-targets --all-features --workspace
clippy:
- cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D
warnings -D clippy::dbg-macro -A clippy::too-many-arguments
+ cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D
warnings -D clippy::dbg-macro -A clippy::too-many-arguments \
+ -A dead_code
ensure-disk-quota:
bash ./scripts/free-disk-space.sh
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index 27c536ac..7db01ea0 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -28,7 +28,7 @@ description.workspace = true
[dependencies]
bytes = { workspace = true }
common = { workspace = true }
-metric_engine = { workspace = true }
+horaedb_storage = { workspace = true }
pb_types = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
diff --git a/src/benchmarks/src/encoding_bench.rs
b/src/benchmarks/src/encoding_bench.rs
index 8a899f61..ff5e3551 100644
--- a/src/benchmarks/src/encoding_bench.rs
+++ b/src/benchmarks/src/encoding_bench.rs
@@ -18,7 +18,7 @@
//! encoding bench.
use bytes::Bytes;
-use metric_engine::{
+use horaedb_storage::{
manifest::Snapshot,
sst::{FileMeta, SstFile},
};
diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml
index dcc79abd..db9b863e 100644
--- a/src/common/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -26,5 +26,7 @@ homepage.workspace = true
description.workspace = true
[dependencies]
+anyhow = { workspace = true }
serde = { workspace = true }
+thiserror = { workspace = true }
toml = { workspace = true }
diff --git a/src/metric_engine/src/error.rs b/src/common/src/error.rs
similarity index 100%
rename from src/metric_engine/src/error.rs
rename to src/common/src/error.rs
diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs
index eb405e27..40fab4d2 100644
--- a/src/common/src/lib.rs
+++ b/src/common/src/lib.rs
@@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+mod error;
mod size_ext;
mod time_ext;
+pub use error::{AnyhowError, Error, Result};
pub use size_ext::ReadableSize;
pub use time_ext::{now, ReadableDuration};
diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml
index 850c5cfe..ae3acf25 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/metric_engine/Cargo.toml
@@ -27,26 +27,11 @@ description.workspace = true
[dependencies]
anyhow = { workspace = true }
-arrow = { workspace = true }
-async-scoped = { workspace = true }
-async-trait = { workspace = true }
-byteorder = { workspace = true }
-bytes = { workspace = true }
-bytesize = { workspace = true }
common = { workspace = true }
-datafusion = { workspace = true }
-futures = { workspace = true }
-itertools = { workspace = true }
-lazy_static = { workspace = true }
-object_store = { workspace = true }
-parquet = { workspace = true, features = ["object_store"] }
-pb_types = { workspace = true }
-prost = { workspace = true }
-serde = { workspace = true }
+horaedb_storage = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
-uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
[dev-dependencies]
temp-dir = { workspace = true }
diff --git a/src/metric_engine/README.md b/src/metric_engine/README.md
new file mode 100644
index 00000000..54ed475e
--- /dev/null
+++ b/src/metric_engine/README.md
@@ -0,0 +1,40 @@
+* Metric Engine
+
+The basic write process is as follows:
+
+```plaintext
+ +----------------+
+ | start |
+ +----------------+
+ |
+ |
+ v
++ - - - - - - - - - -+
+' Metric Engine '
+' '
+' +----------------+ '
+' | metric_manager | '
+' +----------------+ '
+' | '
+' | '
+' v '
+' +----------------+ '
+' | index_manager | '
+' +----------------+ '
+' | '
+' | '
+' v '
+' +----------------+ '
+' | data_manager | '
+' +----------------+ '
+' '
++ - - - - - - - - - -+
+ |
+ |
+ v
+ +----------------+
+ | end |
+ +----------------+
+```
+
+The structure pass between different module is `Sample`, modeled after [data
model](https://prometheus.io/docs/concepts/data_model/) used in prometheus.
diff --git a/src/metric_engine/src/compaction/mod.rs
b/src/metric_engine/src/data/mod.rs
similarity index 59%
copy from src/metric_engine/src/compaction/mod.rs
copy to src/metric_engine/src/data/mod.rs
index fe8c0ddd..96d1a9d7 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/data/mod.rs
@@ -15,22 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-mod executor;
-mod picker;
-mod scheduler;
+use std::sync::Arc;
-pub use scheduler::Scheduler as CompactionScheduler;
+use horaedb_storage::storage::TimeMergeStorageRef;
-use crate::sst::SstFile;
+use crate::{types::Sample, Result};
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Task {
- pub inputs: Vec<SstFile>,
- pub expireds: Vec<SstFile>,
+pub struct SampleManager {
+ inner: Arc<Inner>,
}
-impl Task {
- pub fn input_size(&self) -> u64 {
- self.inputs.iter().map(|f| f.size() as u64).sum()
+impl SampleManager {
+ pub fn new(storage: TimeMergeStorageRef) -> Self {
+ Self {
+ inner: Arc::new(Inner { storage }),
+ }
}
+
+ /// Populate series ids from labels.
+ /// It will also build inverted index for labels.
+ pub async fn persist(&self, _samples: Vec<Sample>) -> Result<()> {
+ todo!()
+ }
+}
+
+struct Inner {
+ storage: TimeMergeStorageRef,
}
diff --git a/src/metric_engine/src/compaction/mod.rs
b/src/metric_engine/src/index/mod.rs
similarity index 59%
copy from src/metric_engine/src/compaction/mod.rs
copy to src/metric_engine/src/index/mod.rs
index fe8c0ddd..4d98db4f 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/index/mod.rs
@@ -15,22 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-mod executor;
-mod picker;
-mod scheduler;
+use std::sync::Arc;
-pub use scheduler::Scheduler as CompactionScheduler;
+use horaedb_storage::storage::TimeMergeStorageRef;
-use crate::sst::SstFile;
+use crate::{types::Sample, Result};
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Task {
- pub inputs: Vec<SstFile>,
- pub expireds: Vec<SstFile>,
+pub struct IndexManager {
+ inner: Arc<Inner>,
}
-impl Task {
- pub fn input_size(&self) -> u64 {
- self.inputs.iter().map(|f| f.size() as u64).sum()
+impl IndexManager {
+ pub fn new(storage: TimeMergeStorageRef) -> Self {
+ Self {
+ inner: Arc::new(Inner { storage }),
+ }
}
+
+ /// Populate series ids from labels.
+ /// It will also build inverted index for labels.
+ pub async fn populate_series_ids(&self, _samples: &mut [Sample]) ->
Result<()> {
+ todo!()
+ }
+}
+
+struct Inner {
+ storage: TimeMergeStorageRef,
}
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs
index a993fb74..2c64fe33 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/lib.rs
@@ -15,20 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-//! Storage Engine for metrics.
+//! Metric Engine entry point.
-#![feature(duration_constructors)]
-mod compaction;
-pub mod config;
-pub mod error;
-mod macros;
-pub mod manifest;
-pub mod operator;
-mod read;
-pub mod sst;
-pub mod storage;
-#[cfg(test)]
-mod test_util;
-pub mod types;
+mod metric;
+mod types;
-pub use error::{AnyhowError, Error, Result};
+// Re-export error types.
+pub type AnyhowError = common::AnyhowError;
+pub type Error = common::Error;
+pub type Result<T> = common::Result<T>;
diff --git a/src/metric_engine/src/compaction/mod.rs
b/src/metric_engine/src/metric/mod.rs
similarity index 58%
copy from src/metric_engine/src/compaction/mod.rs
copy to src/metric_engine/src/metric/mod.rs
index fe8c0ddd..d5164097 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/metric/mod.rs
@@ -15,22 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-mod executor;
-mod picker;
-mod scheduler;
+use std::sync::Arc;
-pub use scheduler::Scheduler as CompactionScheduler;
+use horaedb_storage::storage::TimeMergeStorageRef;
-use crate::sst::SstFile;
+use crate::{types::Sample, Result};
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Task {
- pub inputs: Vec<SstFile>,
- pub expireds: Vec<SstFile>,
+pub struct MetricManager {
+ inner: Arc<Inner>,
}
-impl Task {
- pub fn input_size(&self) -> u64 {
- self.inputs.iter().map(|f| f.size() as u64).sum()
+impl MetricManager {
+ pub fn new(storage: TimeMergeStorageRef) -> Self {
+ Self {
+ inner: Arc::new(Inner { storage }),
+ }
}
+
+ /// Populate metric ids from names.
+ /// If a name does not exist, it will be created on demand.
+ pub async fn populate_metric_ids(&self, _samples: &mut [Sample]) ->
Result<()> {
+ todo!()
+ }
+}
+
+struct Inner {
+ storage: TimeMergeStorageRef,
}
diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs
index 2f905fd6..d371de23 100644
--- a/src/metric_engine/src/types.rs
+++ b/src/metric_engine/src/types.rs
@@ -15,289 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-use std::{
- fmt,
- ops::{Add, Deref, Range},
- sync::Arc,
- time::Duration,
-};
-
-use anyhow::Context;
-use arrow::{
- array::{RecordBatch, UInt64Array},
- datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
-};
-use object_store::ObjectStore;
-use tokio::runtime::Runtime;
-
-use crate::{config::UpdateMode, ensure, sst::FileId, Result};
-
-pub const BUILTIN_COLUMN_NUM: usize = 2;
-/// Seq column is a builtin column, and it will be appended to the end of
-/// user-defined schema.
-pub const SEQ_COLUMN_NAME: &str = "__seq__";
-/// This column is reserved for internal use, and it can be used to store
-/// tombstone/expiration bit-flags.
-pub const RESERVED_COLUMN_NAME: &str = "__reserved__";
-
-pub type RuntimeRef = Arc<Runtime>;
-
-#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
-pub struct Timestamp(pub i64);
-
-impl Add for Timestamp {
- type Output = Self;
-
- fn add(self, rhs: Self) -> Self::Output {
- Self(self.0 + rhs.0)
- }
-}
-
-impl Add<i64> for Timestamp {
- type Output = Self;
-
- fn add(self, rhs: i64) -> Self::Output {
- Self(self.0 + rhs)
- }
-}
-
-impl From<i64> for Timestamp {
- fn from(value: i64) -> Self {
- Self(value)
- }
-}
-
-impl Deref for Timestamp {
- type Target = i64;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl Timestamp {
- pub const MAX: Timestamp = Timestamp(i64::MAX);
- pub const MIN: Timestamp = Timestamp(i64::MIN);
-
- pub fn truncate_by(&self, duration: Duration) -> Self {
- let duration_millis = duration.as_millis() as i64;
- Timestamp(self.0 / duration_millis * duration_millis)
- }
-}
-
-#[derive(Clone, PartialEq, Eq)]
-pub struct TimeRange(Range<Timestamp>);
-
-impl fmt::Debug for TimeRange {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "[{}, {})", self.0.start.0, self.0.end.0)
- }
-}
-
-impl From<Range<Timestamp>> for TimeRange {
- fn from(value: Range<Timestamp>) -> Self {
- Self(value)
- }
-}
-
-impl From<Range<i64>> for TimeRange {
- fn from(value: Range<i64>) -> Self {
- Self(Range {
- start: value.start.into(),
- end: value.end.into(),
- })
- }
-}
-
-impl Deref for TimeRange {
- type Target = Range<Timestamp>;
-
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl TimeRange {
- pub fn new(start: Timestamp, end: Timestamp) -> Self {
- Self(start..end)
- }
-
- pub fn overlaps(&self, other: &TimeRange) -> bool {
- self.0.start < other.0.end && other.0.start < self.0.end
- }
-
- pub fn merge(&mut self, other: &TimeRange) {
- self.0.start = self.0.start.min(other.0.start);
- self.0.end = self.0.end.max(other.0.end);
- }
-}
-
-pub type ObjectStoreRef = Arc<dyn ObjectStore>;
-
-pub struct WriteResult {
- pub id: FileId,
- pub seq: u64,
- pub size: usize,
-}
-
-/// The schema is like:
-/// ```plaintext
-/// primary_key1, primary_key2, ..., primary_keyN, value1, value2, ...,
valueM, seq, reserved
-/// ```
-/// seq and reserved are builtin columns, and they will be appended to the end
-/// of the original schema.
-#[derive(Debug, Clone)]
-pub struct StorageSchema {
- pub arrow_schema: SchemaRef,
- pub num_primary_keys: usize,
- pub seq_idx: usize,
- pub reserved_idx: usize,
- pub value_idxes: Vec<usize>,
- pub update_mode: UpdateMode,
-}
-
-impl StorageSchema {
- pub fn try_new(
- arrow_schema: SchemaRef,
- num_primary_keys: usize,
- update_mode: UpdateMode,
- ) -> Result<Self> {
- ensure!(num_primary_keys > 0, "num_primary_keys should large than 0");
-
- let fields = arrow_schema.fields();
- ensure!(
- !fields.iter().any(Self::is_builtin_field),
- "schema should not use builtin columns name"
- );
-
- let value_idxes =
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
- ensure!(!value_idxes.is_empty(), "no value column found");
-
- let mut new_fields = arrow_schema.fields().clone().to_vec();
- new_fields.extend_from_slice(&[
- Arc::new(Field::new(SEQ_COLUMN_NAME, DataType::UInt64, true)),
- Arc::new(Field::new(RESERVED_COLUMN_NAME, DataType::UInt64, true)),
- ]);
- let seq_idx = new_fields.len() - 2;
- let reserved_idx = new_fields.len() - 1;
-
- let arrow_schema = Arc::new(Schema::new_with_metadata(
- new_fields,
- arrow_schema.metadata.clone(),
- ));
- Ok(Self {
- arrow_schema,
- num_primary_keys,
- seq_idx,
- reserved_idx,
- value_idxes,
- update_mode,
- })
- }
-
- pub fn is_builtin_field(f: &FieldRef) -> bool {
- f.name() == SEQ_COLUMN_NAME || f.name() == RESERVED_COLUMN_NAME
- }
-
- /// Primary keys and builtin columns are required when query.
- pub fn fill_required_projections(&self, projection: &mut
Option<Vec<usize>>) {
- if let Some(proj) = projection.as_mut() {
- for i in 0..self.num_primary_keys {
- if !proj.contains(&i) {
- proj.push(i);
- }
- }
- // For builtin columns, reserved column is not used for now,
- // so only add seq column.
- if !proj.contains(&self.seq_idx) {
- proj.push(self.seq_idx);
- }
- }
- }
-
- /// Builtin columns are always appended to the end of the schema.
- pub fn fill_builtin_columns(
- &self,
- record_batch: RecordBatch,
- sequence: u64,
- ) -> Result<RecordBatch> {
- let num_rows = record_batch.num_rows();
- if num_rows == 0 {
- return Ok(record_batch);
- }
-
- let mut columns = record_batch.columns().to_vec();
- let seq_array = UInt64Array::from_iter_values((0..num_rows).map(|_|
sequence));
- columns.push(Arc::new(seq_array));
- let reserved_array = UInt64Array::new_null(num_rows);
- columns.push(Arc::new(reserved_array));
-
- let new_batch = RecordBatch::try_new(self.arrow_schema.clone(),
columns)
- .context("construct record batch with seq column")?;
-
- Ok(new_batch)
- }
-}
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::{arrow_schema, record_batch};
-
- #[test]
- fn test_timestamp_truncate_by() {
- let testcases = [
- // ts, segment, expected
- (0, 20, 0),
- (10, 20, 0),
- (20, 20, 20),
- (30, 20, 20),
- (40, 20, 40),
- (41, 20, 40),
- ];
- for (ts, segment, expected) in testcases {
- let actual =
Timestamp::from(ts).truncate_by(Duration::from_millis(segment));
- assert_eq!(actual.0, expected);
- }
- }
-
- #[test]
- fn test_build_storage_schema() {
- let arrow_schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8),
("value", Int64));
- let schema = StorageSchema::try_new(arrow_schema.clone(), 2,
UpdateMode::Append).unwrap();
- assert_eq!(schema.value_idxes, vec![2]);
- assert_eq!(schema.seq_idx, 3);
- assert_eq!(schema.reserved_idx, 4);
-
- // No value column exists
- assert!(StorageSchema::try_new(arrow_schema, 3,
UpdateMode::Append).is_err());
-
- let batch = record_batch!(
- ("pk1", UInt8, vec![11, 11, 9, 10]),
- ("pk2", UInt8, vec![100, 99, 1, 2]),
- ("value", Int64, vec![22, 77, 44, 66])
- )
- .unwrap();
- let sequence = 999;
- let new_batch = schema.fill_builtin_columns(batch, sequence).unwrap();
- let expected_batch = record_batch!(
- ("pk1", UInt8, vec![11, 11, 9, 10]),
- ("pk2", UInt8, vec![100, 99, 1, 2]),
- ("value", Int64, vec![22, 77, 44, 66]),
- (SEQ_COLUMN_NAME, UInt64, vec![sequence; 4]),
- (RESERVED_COLUMN_NAME, UInt64, vec![None; 4])
- )
- .unwrap();
- assert_eq!(new_batch, expected_batch);
-
- let mut testcases = [
- (None, None),
- (Some(vec![]), Some(vec![0, 1, 3])),
- (Some(vec![1]), Some(vec![1, 0, 3])),
- (Some(vec![2]), Some(vec![2, 0, 1, 3])),
- ];
- for (input, expected) in testcases.iter_mut() {
- schema.fill_required_projections(input);
- assert_eq!(input, expected);
- }
- }
+pub struct MetricId(u64);
+pub struct SeriesId(u64);
+
+pub struct Label {
+ pub name: Vec<u8>,
+ pub value: Vec<u8>,
+}
+
+/// This is the main struct used for write, optional values will be filled in
+/// different modules.
+pub struct Sample {
+ pub name: Vec<u8>,
+ pub lables: Vec<Label>,
+ pub timestamp: i64,
+ pub value: f64,
+ /// hash of name
+ pub name_id: Option<MetricId>,
+ /// hash of labels(sorted)
+ pub series_id: Option<SeriesId>,
}
diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml
index 8136b7b7..c6964978 100644
--- a/src/server/Cargo.toml
+++ b/src/server/Cargo.toml
@@ -31,7 +31,7 @@ arrow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
common = { workspace = true }
futures = { workspace = true }
-metric_engine = { workspace = true }
+horaedb_storage = { workspace = true }
object_store = { workspace = true }
rand = "0.8"
serde = { workspace = true }
diff --git a/src/server/src/config.rs b/src/server/src/config.rs
index 57ebc18b..668a895e 100644
--- a/src/server/src/config.rs
+++ b/src/server/src/config.rs
@@ -83,7 +83,7 @@ impl Default for ThreadConfig {
#[serde(default, deny_unknown_fields)]
pub struct StorageConfig {
pub object_store: ObjectStorageConfig,
- pub time_merge_storage: metric_engine::config::StorageConfig,
+ pub time_merge_storage: horaedb_storage::config::StorageConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
diff --git a/src/server/src/main.rs b/src/server/src/main.rs
index 055aa4a0..df154bc3 100644
--- a/src/server/src/main.rs
+++ b/src/server/src/main.rs
@@ -38,7 +38,7 @@ use arrow::{
};
use clap::Parser;
use config::{Config, ObjectStorageConfig};
-use metric_engine::{
+use horaedb_storage::{
storage::{
CloudObjectStorage, CompactRequest, StorageRuntimes,
TimeMergeStorageRef, WriteRequest,
},
diff --git a/src/metric_engine/Cargo.toml b/src/storage/Cargo.toml
similarity index 91%
copy from src/metric_engine/Cargo.toml
copy to src/storage/Cargo.toml
index 850c5cfe..a6ee671d 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/storage/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
-name = "metric_engine"
+name = "storage"
version.workspace = true
authors.workspace = true
edition.workspace = true
@@ -32,7 +32,6 @@ async-scoped = { workspace = true }
async-trait = { workspace = true }
byteorder = { workspace = true }
bytes = { workspace = true }
-bytesize = { workspace = true }
common = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
@@ -43,10 +42,8 @@ parquet = { workspace = true, features = ["object_store"] }
pb_types = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
-thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
-uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
[dev-dependencies]
temp-dir = { workspace = true }
diff --git a/src/metric_engine/src/compaction/executor.rs
b/src/storage/src/compaction/executor.rs
similarity index 100%
rename from src/metric_engine/src/compaction/executor.rs
rename to src/storage/src/compaction/executor.rs
diff --git a/src/metric_engine/src/compaction/mod.rs
b/src/storage/src/compaction/mod.rs
similarity index 100%
rename from src/metric_engine/src/compaction/mod.rs
rename to src/storage/src/compaction/mod.rs
diff --git a/src/metric_engine/src/compaction/picker.rs
b/src/storage/src/compaction/picker.rs
similarity index 100%
rename from src/metric_engine/src/compaction/picker.rs
rename to src/storage/src/compaction/picker.rs
diff --git a/src/metric_engine/src/compaction/scheduler.rs
b/src/storage/src/compaction/scheduler.rs
similarity index 100%
rename from src/metric_engine/src/compaction/scheduler.rs
rename to src/storage/src/compaction/scheduler.rs
diff --git a/src/metric_engine/src/config.rs b/src/storage/src/config.rs
similarity index 100%
rename from src/metric_engine/src/config.rs
rename to src/storage/src/config.rs
diff --git a/src/metric_engine/src/lib.rs b/src/storage/src/lib.rs
similarity index 87%
copy from src/metric_engine/src/lib.rs
copy to src/storage/src/lib.rs
index a993fb74..c16e39e4 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/storage/src/lib.rs
@@ -20,7 +20,6 @@
#![feature(duration_constructors)]
mod compaction;
pub mod config;
-pub mod error;
mod macros;
pub mod manifest;
pub mod operator;
@@ -31,4 +30,7 @@ pub mod storage;
mod test_util;
pub mod types;
-pub use error::{AnyhowError, Error, Result};
+// Re-export error types.
+pub type AnyhowError = common::AnyhowError;
+pub type Error = common::Error;
+pub type Result<T> = common::Result<T>;
diff --git a/src/metric_engine/src/macros.rs b/src/storage/src/macros.rs
similarity index 100%
rename from src/metric_engine/src/macros.rs
rename to src/storage/src/macros.rs
diff --git a/src/metric_engine/src/manifest/encoding.rs
b/src/storage/src/manifest/encoding.rs
similarity index 100%
rename from src/metric_engine/src/manifest/encoding.rs
rename to src/storage/src/manifest/encoding.rs
diff --git a/src/metric_engine/src/manifest/mod.rs
b/src/storage/src/manifest/mod.rs
similarity index 100%
rename from src/metric_engine/src/manifest/mod.rs
rename to src/storage/src/manifest/mod.rs
diff --git a/src/metric_engine/src/operator.rs b/src/storage/src/operator.rs
similarity index 100%
rename from src/metric_engine/src/operator.rs
rename to src/storage/src/operator.rs
diff --git a/src/metric_engine/src/read.rs b/src/storage/src/read.rs
similarity index 100%
rename from src/metric_engine/src/read.rs
rename to src/storage/src/read.rs
diff --git a/src/metric_engine/src/sst.rs b/src/storage/src/sst.rs
similarity index 100%
rename from src/metric_engine/src/sst.rs
rename to src/storage/src/sst.rs
diff --git a/src/metric_engine/src/storage.rs b/src/storage/src/storage.rs
similarity index 100%
rename from src/metric_engine/src/storage.rs
rename to src/storage/src/storage.rs
diff --git a/src/metric_engine/src/test_util.rs b/src/storage/src/test_util.rs
similarity index 100%
rename from src/metric_engine/src/test_util.rs
rename to src/storage/src/test_util.rs
diff --git a/src/metric_engine/src/types.rs b/src/storage/src/types.rs
similarity index 100%
copy from src/metric_engine/src/types.rs
copy to src/storage/src/types.rs
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]