This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 941ea072 feat: add metric manager (#1621)
941ea072 is described below

commit 941ea0722c76be2a6d8e46ac1487e22085855a42
Author: Jiacai Liu <[email protected]>
AuthorDate: Tue Jan 14 11:52:47 2025 +0800

    feat: add metric manager (#1621)
---
 Cargo.lock                                         |  68 +++--
 Cargo.toml                                         |   4 +-
 Makefile                                           |   3 +-
 src/benchmarks/Cargo.toml                          |   2 +-
 src/benchmarks/src/encoding_bench.rs               |   2 +-
 src/common/Cargo.toml                              |   2 +
 src/{metric_engine => common}/src/error.rs         |   0
 src/common/src/lib.rs                              |   2 +
 src/metric_engine/Cargo.toml                       |  17 +-
 src/metric_engine/README.md                        |  40 +++
 src/metric_engine/src/{compaction => data}/mod.rs  |  32 ++-
 src/metric_engine/src/{compaction => index}/mod.rs |  32 ++-
 src/metric_engine/src/lib.rs                       |  22 +-
 .../src/{compaction => metric}/mod.rs              |  32 ++-
 src/metric_engine/src/types.rs                     | 304 ++-------------------
 src/server/Cargo.toml                              |   2 +-
 src/server/src/config.rs                           |   2 +-
 src/server/src/main.rs                             |   2 +-
 src/{metric_engine => storage}/Cargo.toml          |   5 +-
 .../src/compaction/executor.rs                     |   0
 .../src/compaction/mod.rs                          |   0
 .../src/compaction/picker.rs                       |   0
 .../src/compaction/scheduler.rs                    |   0
 src/{metric_engine => storage}/src/config.rs       |   0
 src/{metric_engine => storage}/src/lib.rs          |   6 +-
 src/{metric_engine => storage}/src/macros.rs       |   0
 .../src/manifest/encoding.rs                       |   0
 src/{metric_engine => storage}/src/manifest/mod.rs |   0
 src/{metric_engine => storage}/src/operator.rs     |   0
 src/{metric_engine => storage}/src/read.rs         |   0
 src/{metric_engine => storage}/src/sst.rs          |   0
 src/{metric_engine => storage}/src/storage.rs      |   0
 src/{metric_engine => storage}/src/test_util.rs    |   0
 src/{metric_engine => storage}/src/types.rs        |   0
 34 files changed, 177 insertions(+), 402 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0a559050..98c71ab0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -632,10 +632,10 @@ dependencies = [
  "bytes",
  "common",
  "criterion",
- "metric_engine",
  "pb_types",
  "prost",
  "serde",
+ "storage",
  "toml",
  "tracing",
  "tracing-subscriber",
@@ -734,12 +734,6 @@ version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
 
-[[package]]
-name = "bytesize"
-version = "1.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc"
-
 [[package]]
 name = "bytestring"
 version = "1.4.0"
@@ -914,7 +908,9 @@ dependencies = [
 name = "common"
 version = "2.2.0-alpha"
 dependencies = [
+ "anyhow",
  "serde",
+ "thiserror",
  "toml",
 ]
 
@@ -2158,28 +2154,13 @@ name = "metric_engine"
 version = "2.2.0-alpha"
 dependencies = [
  "anyhow",
- "arrow",
- "async-scoped",
- "async-trait",
- "byteorder",
- "bytes",
- "bytesize",
  "common",
- "datafusion",
- "futures",
- "itertools 0.3.25",
- "lazy_static",
- "object_store",
- "parquet",
- "pb_types",
- "prost",
- "serde",
+ "storage",
  "temp-dir",
  "test-log",
  "thiserror",
  "tokio",
  "tracing",
- "uuid",
 ]
 
 [[package]]
@@ -2918,10 +2899,10 @@ dependencies = [
  "clap",
  "common",
  "futures",
- "metric_engine",
  "object_store",
  "rand",
  "serde",
+ "storage",
  "tokio",
  "toml",
  "tracing",
@@ -3059,6 +3040,32 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
 
+[[package]]
+name = "storage"
+version = "2.2.0-alpha"
+dependencies = [
+ "anyhow",
+ "arrow",
+ "async-scoped",
+ "async-trait",
+ "byteorder",
+ "bytes",
+ "common",
+ "datafusion",
+ "futures",
+ "itertools 0.3.25",
+ "lazy_static",
+ "object_store",
+ "parquet",
+ "pb_types",
+ "prost",
+ "serde",
+ "temp-dir",
+ "test-log",
+ "tokio",
+ "tracing",
+]
+
 [[package]]
 name = "strsim"
 version = "0.11.1"
@@ -3465,19 +3472,6 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
 dependencies = [
  "getrandom",
- "rand",
- "uuid-macro-internal",
-]
-
-[[package]]
-name = "uuid-macro-internal"
-version = "1.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6b91f57fe13a38d0ce9e28a03463d8d3c2468ed03d75375110ec71d93b449a08"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 9d7dc825..d5d24cf8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,12 +31,14 @@ members = [
     "src/common",
     "src/metric_engine",
     "src/pb_types",
-    "src/server"
+    "src/server",
+    "src/storage"
 ]
 
 [workspace.dependencies]
 anyhow = { version = "1.0" }
 metric_engine = { path = "src/metric_engine" }
+horaedb_storage = { package = "storage", path = "src/storage" }
 common = { path = "src/common" }
 thiserror = "1"
 bytes = "1"
diff --git a/Makefile b/Makefile
index 6b4f287f..ce8beee5 100644
--- a/Makefile
+++ b/Makefile
@@ -47,7 +47,8 @@ udeps:
        cd $(DIR); cargo udeps --all-targets --all-features --workspace
 
 clippy:
-       cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D 
warnings -D clippy::dbg-macro -A clippy::too-many-arguments
+       cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D 
warnings -D clippy::dbg-macro -A clippy::too-many-arguments \
+       -A dead_code
 
 ensure-disk-quota:
        bash ./scripts/free-disk-space.sh
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index 27c536ac..7db01ea0 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -28,7 +28,7 @@ description.workspace = true
 [dependencies]
 bytes = { workspace = true }
 common = { workspace = true }
-metric_engine = { workspace = true }
+horaedb_storage = { workspace = true }
 pb_types = { workspace = true }
 prost = { workspace = true }
 serde = { workspace = true }
diff --git a/src/benchmarks/src/encoding_bench.rs 
b/src/benchmarks/src/encoding_bench.rs
index 8a899f61..ff5e3551 100644
--- a/src/benchmarks/src/encoding_bench.rs
+++ b/src/benchmarks/src/encoding_bench.rs
@@ -18,7 +18,7 @@
 //! encoding bench.
 
 use bytes::Bytes;
-use metric_engine::{
+use horaedb_storage::{
     manifest::Snapshot,
     sst::{FileMeta, SstFile},
 };
diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml
index dcc79abd..db9b863e 100644
--- a/src/common/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -26,5 +26,7 @@ homepage.workspace = true
 description.workspace = true
 
 [dependencies]
+anyhow = { workspace = true }
 serde = { workspace = true }
+thiserror = { workspace = true }
 toml = { workspace = true }
diff --git a/src/metric_engine/src/error.rs b/src/common/src/error.rs
similarity index 100%
rename from src/metric_engine/src/error.rs
rename to src/common/src/error.rs
diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs
index eb405e27..40fab4d2 100644
--- a/src/common/src/lib.rs
+++ b/src/common/src/lib.rs
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod error;
 mod size_ext;
 mod time_ext;
 
+pub use error::{AnyhowError, Error, Result};
 pub use size_ext::ReadableSize;
 pub use time_ext::{now, ReadableDuration};
diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml
index 850c5cfe..ae3acf25 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/metric_engine/Cargo.toml
@@ -27,26 +27,11 @@ description.workspace = true
 
 [dependencies]
 anyhow = { workspace = true }
-arrow = { workspace = true }
-async-scoped = { workspace = true }
-async-trait = { workspace = true }
-byteorder = { workspace = true }
-bytes = { workspace = true }
-bytesize = { workspace = true }
 common = { workspace = true }
-datafusion = { workspace = true }
-futures = { workspace = true }
-itertools = { workspace = true }
-lazy_static = { workspace = true }
-object_store = { workspace = true }
-parquet = { workspace = true, features = ["object_store"] }
-pb_types = { workspace = true }
-prost = { workspace = true }
-serde = { workspace = true }
+horaedb_storage = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
 tracing = { workspace = true }
-uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
 
 [dev-dependencies]
 temp-dir = { workspace = true }
diff --git a/src/metric_engine/README.md b/src/metric_engine/README.md
new file mode 100644
index 00000000..54ed475e
--- /dev/null
+++ b/src/metric_engine/README.md
@@ -0,0 +1,40 @@
+* Metric Engine
+
+The basic write process is as follows:
+
+```plaintext
+  +----------------+
+  |     start      |
+  +----------------+
+    |
+    |
+    v
++ - - - - - - - - - -+
+'   Metric Engine    '
+'                    '
+' +----------------+ '
+' | metric_manager | '
+' +----------------+ '
+'   |                '
+'   |                '
+'   v                '
+' +----------------+ '
+' | index_manager  | '
+' +----------------+ '
+'   |                '
+'   |                '
+'   v                '
+' +----------------+ '
+' |  data_manager  | '
+' +----------------+ '
+'                    '
++ - - - - - - - - - -+
+    |
+    |
+    v
+  +----------------+
+  |      end       |
+  +----------------+
+```
+
+The structure pass between different module is `Sample`, modeled after [data 
model](https://prometheus.io/docs/concepts/data_model/) used in prometheus.
diff --git a/src/metric_engine/src/compaction/mod.rs 
b/src/metric_engine/src/data/mod.rs
similarity index 59%
copy from src/metric_engine/src/compaction/mod.rs
copy to src/metric_engine/src/data/mod.rs
index fe8c0ddd..96d1a9d7 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/data/mod.rs
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod executor;
-mod picker;
-mod scheduler;
+use std::sync::Arc;
 
-pub use scheduler::Scheduler as CompactionScheduler;
+use horaedb_storage::storage::TimeMergeStorageRef;
 
-use crate::sst::SstFile;
+use crate::{types::Sample, Result};
 
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Task {
-    pub inputs: Vec<SstFile>,
-    pub expireds: Vec<SstFile>,
+pub struct SampleManager {
+    inner: Arc<Inner>,
 }
 
-impl Task {
-    pub fn input_size(&self) -> u64 {
-        self.inputs.iter().map(|f| f.size() as u64).sum()
+impl SampleManager {
+    pub fn new(storage: TimeMergeStorageRef) -> Self {
+        Self {
+            inner: Arc::new(Inner { storage }),
+        }
     }
+
+    /// Populate series ids from labels.
+    /// It will also build inverted index for labels.
+    pub async fn persist(&self, _samples: Vec<Sample>) -> Result<()> {
+        todo!()
+    }
+}
+
+struct Inner {
+    storage: TimeMergeStorageRef,
 }
diff --git a/src/metric_engine/src/compaction/mod.rs 
b/src/metric_engine/src/index/mod.rs
similarity index 59%
copy from src/metric_engine/src/compaction/mod.rs
copy to src/metric_engine/src/index/mod.rs
index fe8c0ddd..4d98db4f 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/index/mod.rs
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod executor;
-mod picker;
-mod scheduler;
+use std::sync::Arc;
 
-pub use scheduler::Scheduler as CompactionScheduler;
+use horaedb_storage::storage::TimeMergeStorageRef;
 
-use crate::sst::SstFile;
+use crate::{types::Sample, Result};
 
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Task {
-    pub inputs: Vec<SstFile>,
-    pub expireds: Vec<SstFile>,
+pub struct IndexManager {
+    inner: Arc<Inner>,
 }
 
-impl Task {
-    pub fn input_size(&self) -> u64 {
-        self.inputs.iter().map(|f| f.size() as u64).sum()
+impl IndexManager {
+    pub fn new(storage: TimeMergeStorageRef) -> Self {
+        Self {
+            inner: Arc::new(Inner { storage }),
+        }
     }
+
+    /// Populate series ids from labels.
+    /// It will also build inverted index for labels.
+    pub async fn populate_series_ids(&self, _samples: &mut [Sample]) -> 
Result<()> {
+        todo!()
+    }
+}
+
+struct Inner {
+    storage: TimeMergeStorageRef,
 }
diff --git a/src/metric_engine/src/lib.rs b/src/metric_engine/src/lib.rs
index a993fb74..2c64fe33 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/metric_engine/src/lib.rs
@@ -15,20 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Storage Engine for metrics.
+//! Metric Engine entry point.
 
-#![feature(duration_constructors)]
-mod compaction;
-pub mod config;
-pub mod error;
-mod macros;
-pub mod manifest;
-pub mod operator;
-mod read;
-pub mod sst;
-pub mod storage;
-#[cfg(test)]
-mod test_util;
-pub mod types;
+mod metric;
+mod types;
 
-pub use error::{AnyhowError, Error, Result};
+// Re-export error types.
+pub type AnyhowError = common::AnyhowError;
+pub type Error = common::Error;
+pub type Result<T> = common::Result<T>;
diff --git a/src/metric_engine/src/compaction/mod.rs 
b/src/metric_engine/src/metric/mod.rs
similarity index 58%
copy from src/metric_engine/src/compaction/mod.rs
copy to src/metric_engine/src/metric/mod.rs
index fe8c0ddd..d5164097 100644
--- a/src/metric_engine/src/compaction/mod.rs
+++ b/src/metric_engine/src/metric/mod.rs
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod executor;
-mod picker;
-mod scheduler;
+use std::sync::Arc;
 
-pub use scheduler::Scheduler as CompactionScheduler;
+use horaedb_storage::storage::TimeMergeStorageRef;
 
-use crate::sst::SstFile;
+use crate::{types::Sample, Result};
 
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct Task {
-    pub inputs: Vec<SstFile>,
-    pub expireds: Vec<SstFile>,
+pub struct MetricManager {
+    inner: Arc<Inner>,
 }
 
-impl Task {
-    pub fn input_size(&self) -> u64 {
-        self.inputs.iter().map(|f| f.size() as u64).sum()
+impl MetricManager {
+    pub fn new(storage: TimeMergeStorageRef) -> Self {
+        Self {
+            inner: Arc::new(Inner { storage }),
+        }
     }
+
+    /// Populate metric ids from names.
+    /// If a name does not exist, it will be created on demand.
+    pub async fn populate_metric_ids(&self, _samples: &mut [Sample]) -> 
Result<()> {
+        todo!()
+    }
+}
+
+struct Inner {
+    storage: TimeMergeStorageRef,
 }
diff --git a/src/metric_engine/src/types.rs b/src/metric_engine/src/types.rs
index 2f905fd6..d371de23 100644
--- a/src/metric_engine/src/types.rs
+++ b/src/metric_engine/src/types.rs
@@ -15,289 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{
-    fmt,
-    ops::{Add, Deref, Range},
-    sync::Arc,
-    time::Duration,
-};
-
-use anyhow::Context;
-use arrow::{
-    array::{RecordBatch, UInt64Array},
-    datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
-};
-use object_store::ObjectStore;
-use tokio::runtime::Runtime;
-
-use crate::{config::UpdateMode, ensure, sst::FileId, Result};
-
-pub const BUILTIN_COLUMN_NUM: usize = 2;
-/// Seq column is a builtin column, and it will be appended to the end of
-/// user-defined schema.
-pub const SEQ_COLUMN_NAME: &str = "__seq__";
-/// This column is reserved for internal use, and it can be used to store
-/// tombstone/expiration bit-flags.
-pub const RESERVED_COLUMN_NAME: &str = "__reserved__";
-
-pub type RuntimeRef = Arc<Runtime>;
-
-#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
-pub struct Timestamp(pub i64);
-
-impl Add for Timestamp {
-    type Output = Self;
-
-    fn add(self, rhs: Self) -> Self::Output {
-        Self(self.0 + rhs.0)
-    }
-}
-
-impl Add<i64> for Timestamp {
-    type Output = Self;
-
-    fn add(self, rhs: i64) -> Self::Output {
-        Self(self.0 + rhs)
-    }
-}
-
-impl From<i64> for Timestamp {
-    fn from(value: i64) -> Self {
-        Self(value)
-    }
-}
-
-impl Deref for Timestamp {
-    type Target = i64;
-
-    fn deref(&self) -> &Self::Target {
-        &self.0
-    }
-}
-
-impl Timestamp {
-    pub const MAX: Timestamp = Timestamp(i64::MAX);
-    pub const MIN: Timestamp = Timestamp(i64::MIN);
-
-    pub fn truncate_by(&self, duration: Duration) -> Self {
-        let duration_millis = duration.as_millis() as i64;
-        Timestamp(self.0 / duration_millis * duration_millis)
-    }
-}
-
-#[derive(Clone, PartialEq, Eq)]
-pub struct TimeRange(Range<Timestamp>);
-
-impl fmt::Debug for TimeRange {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        write!(f, "[{}, {})", self.0.start.0, self.0.end.0)
-    }
-}
-
-impl From<Range<Timestamp>> for TimeRange {
-    fn from(value: Range<Timestamp>) -> Self {
-        Self(value)
-    }
-}
-
-impl From<Range<i64>> for TimeRange {
-    fn from(value: Range<i64>) -> Self {
-        Self(Range {
-            start: value.start.into(),
-            end: value.end.into(),
-        })
-    }
-}
-
-impl Deref for TimeRange {
-    type Target = Range<Timestamp>;
-
-    fn deref(&self) -> &Self::Target {
-        &self.0
-    }
-}
-
-impl TimeRange {
-    pub fn new(start: Timestamp, end: Timestamp) -> Self {
-        Self(start..end)
-    }
-
-    pub fn overlaps(&self, other: &TimeRange) -> bool {
-        self.0.start < other.0.end && other.0.start < self.0.end
-    }
-
-    pub fn merge(&mut self, other: &TimeRange) {
-        self.0.start = self.0.start.min(other.0.start);
-        self.0.end = self.0.end.max(other.0.end);
-    }
-}
-
-pub type ObjectStoreRef = Arc<dyn ObjectStore>;
-
-pub struct WriteResult {
-    pub id: FileId,
-    pub seq: u64,
-    pub size: usize,
-}
-
-/// The schema is like:
-/// ```plaintext
-/// primary_key1, primary_key2, ..., primary_keyN, value1, value2, ..., 
valueM, seq, reserved
-/// ```
-/// seq and reserved are builtin columns, and they will be appended to the end
-/// of the original schema.
-#[derive(Debug, Clone)]
-pub struct StorageSchema {
-    pub arrow_schema: SchemaRef,
-    pub num_primary_keys: usize,
-    pub seq_idx: usize,
-    pub reserved_idx: usize,
-    pub value_idxes: Vec<usize>,
-    pub update_mode: UpdateMode,
-}
-
-impl StorageSchema {
-    pub fn try_new(
-        arrow_schema: SchemaRef,
-        num_primary_keys: usize,
-        update_mode: UpdateMode,
-    ) -> Result<Self> {
-        ensure!(num_primary_keys > 0, "num_primary_keys should large than 0");
-
-        let fields = arrow_schema.fields();
-        ensure!(
-            !fields.iter().any(Self::is_builtin_field),
-            "schema should not use builtin columns name"
-        );
-
-        let value_idxes = 
(num_primary_keys..arrow_schema.fields.len()).collect::<Vec<_>>();
-        ensure!(!value_idxes.is_empty(), "no value column found");
-
-        let mut new_fields = arrow_schema.fields().clone().to_vec();
-        new_fields.extend_from_slice(&[
-            Arc::new(Field::new(SEQ_COLUMN_NAME, DataType::UInt64, true)),
-            Arc::new(Field::new(RESERVED_COLUMN_NAME, DataType::UInt64, true)),
-        ]);
-        let seq_idx = new_fields.len() - 2;
-        let reserved_idx = new_fields.len() - 1;
-
-        let arrow_schema = Arc::new(Schema::new_with_metadata(
-            new_fields,
-            arrow_schema.metadata.clone(),
-        ));
-        Ok(Self {
-            arrow_schema,
-            num_primary_keys,
-            seq_idx,
-            reserved_idx,
-            value_idxes,
-            update_mode,
-        })
-    }
-
-    pub fn is_builtin_field(f: &FieldRef) -> bool {
-        f.name() == SEQ_COLUMN_NAME || f.name() == RESERVED_COLUMN_NAME
-    }
-
-    /// Primary keys and builtin columns are required when query.
-    pub fn fill_required_projections(&self, projection: &mut 
Option<Vec<usize>>) {
-        if let Some(proj) = projection.as_mut() {
-            for i in 0..self.num_primary_keys {
-                if !proj.contains(&i) {
-                    proj.push(i);
-                }
-            }
-            // For builtin columns, reserved column is not used for now,
-            // so only add seq column.
-            if !proj.contains(&self.seq_idx) {
-                proj.push(self.seq_idx);
-            }
-        }
-    }
-
-    /// Builtin columns are always appended to the end of the schema.
-    pub fn fill_builtin_columns(
-        &self,
-        record_batch: RecordBatch,
-        sequence: u64,
-    ) -> Result<RecordBatch> {
-        let num_rows = record_batch.num_rows();
-        if num_rows == 0 {
-            return Ok(record_batch);
-        }
-
-        let mut columns = record_batch.columns().to_vec();
-        let seq_array = UInt64Array::from_iter_values((0..num_rows).map(|_| 
sequence));
-        columns.push(Arc::new(seq_array));
-        let reserved_array = UInt64Array::new_null(num_rows);
-        columns.push(Arc::new(reserved_array));
-
-        let new_batch = RecordBatch::try_new(self.arrow_schema.clone(), 
columns)
-            .context("construct record batch with seq column")?;
-
-        Ok(new_batch)
-    }
-}
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::{arrow_schema, record_batch};
-
-    #[test]
-    fn test_timestamp_truncate_by() {
-        let testcases = [
-            // ts, segment, expected
-            (0, 20, 0),
-            (10, 20, 0),
-            (20, 20, 20),
-            (30, 20, 20),
-            (40, 20, 40),
-            (41, 20, 40),
-        ];
-        for (ts, segment, expected) in testcases {
-            let actual = 
Timestamp::from(ts).truncate_by(Duration::from_millis(segment));
-            assert_eq!(actual.0, expected);
-        }
-    }
-
-    #[test]
-    fn test_build_storage_schema() {
-        let arrow_schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), 
("value", Int64));
-        let schema = StorageSchema::try_new(arrow_schema.clone(), 2, 
UpdateMode::Append).unwrap();
-        assert_eq!(schema.value_idxes, vec![2]);
-        assert_eq!(schema.seq_idx, 3);
-        assert_eq!(schema.reserved_idx, 4);
-
-        // No value column exists
-        assert!(StorageSchema::try_new(arrow_schema, 3, 
UpdateMode::Append).is_err());
-
-        let batch = record_batch!(
-            ("pk1", UInt8, vec![11, 11, 9, 10]),
-            ("pk2", UInt8, vec![100, 99, 1, 2]),
-            ("value", Int64, vec![22, 77, 44, 66])
-        )
-        .unwrap();
-        let sequence = 999;
-        let new_batch = schema.fill_builtin_columns(batch, sequence).unwrap();
-        let expected_batch = record_batch!(
-            ("pk1", UInt8, vec![11, 11, 9, 10]),
-            ("pk2", UInt8, vec![100, 99, 1, 2]),
-            ("value", Int64, vec![22, 77, 44, 66]),
-            (SEQ_COLUMN_NAME, UInt64, vec![sequence; 4]),
-            (RESERVED_COLUMN_NAME, UInt64, vec![None; 4])
-        )
-        .unwrap();
-        assert_eq!(new_batch, expected_batch);
-
-        let mut testcases = [
-            (None, None),
-            (Some(vec![]), Some(vec![0, 1, 3])),
-            (Some(vec![1]), Some(vec![1, 0, 3])),
-            (Some(vec![2]), Some(vec![2, 0, 1, 3])),
-        ];
-        for (input, expected) in testcases.iter_mut() {
-            schema.fill_required_projections(input);
-            assert_eq!(input, expected);
-        }
-    }
+pub struct MetricId(u64);
+pub struct SeriesId(u64);
+
+pub struct Label {
+    pub name: Vec<u8>,
+    pub value: Vec<u8>,
+}
+
+/// This is the main struct used for write, optional values will be filled in
+/// different modules.
+pub struct Sample {
+    pub name: Vec<u8>,
+    pub lables: Vec<Label>,
+    pub timestamp: i64,
+    pub value: f64,
+    /// hash of name
+    pub name_id: Option<MetricId>,
+    /// hash of labels(sorted)
+    pub series_id: Option<SeriesId>,
 }
diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml
index 8136b7b7..c6964978 100644
--- a/src/server/Cargo.toml
+++ b/src/server/Cargo.toml
@@ -31,7 +31,7 @@ arrow = { workspace = true }
 clap = { workspace = true, features = ["derive"] }
 common = { workspace = true }
 futures = { workspace = true }
-metric_engine = { workspace = true }
+horaedb_storage = { workspace = true }
 object_store = { workspace = true }
 rand = "0.8"
 serde = { workspace = true }
diff --git a/src/server/src/config.rs b/src/server/src/config.rs
index 57ebc18b..668a895e 100644
--- a/src/server/src/config.rs
+++ b/src/server/src/config.rs
@@ -83,7 +83,7 @@ impl Default for ThreadConfig {
 #[serde(default, deny_unknown_fields)]
 pub struct StorageConfig {
     pub object_store: ObjectStorageConfig,
-    pub time_merge_storage: metric_engine::config::StorageConfig,
+    pub time_merge_storage: horaedb_storage::config::StorageConfig,
 }
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
diff --git a/src/server/src/main.rs b/src/server/src/main.rs
index 055aa4a0..df154bc3 100644
--- a/src/server/src/main.rs
+++ b/src/server/src/main.rs
@@ -38,7 +38,7 @@ use arrow::{
 };
 use clap::Parser;
 use config::{Config, ObjectStorageConfig};
-use metric_engine::{
+use horaedb_storage::{
     storage::{
         CloudObjectStorage, CompactRequest, StorageRuntimes, 
TimeMergeStorageRef, WriteRequest,
     },
diff --git a/src/metric_engine/Cargo.toml b/src/storage/Cargo.toml
similarity index 91%
copy from src/metric_engine/Cargo.toml
copy to src/storage/Cargo.toml
index 850c5cfe..a6ee671d 100644
--- a/src/metric_engine/Cargo.toml
+++ b/src/storage/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [package]
-name = "metric_engine"
+name = "storage"
 version.workspace = true
 authors.workspace = true
 edition.workspace = true
@@ -32,7 +32,6 @@ async-scoped = { workspace = true }
 async-trait = { workspace = true }
 byteorder = { workspace = true }
 bytes = { workspace = true }
-bytesize = { workspace = true }
 common = { workspace = true }
 datafusion = { workspace = true }
 futures = { workspace = true }
@@ -43,10 +42,8 @@ parquet = { workspace = true, features = ["object_store"] }
 pb_types = { workspace = true }
 prost = { workspace = true }
 serde = { workspace = true }
-thiserror = { workspace = true }
 tokio = { workspace = true }
 tracing = { workspace = true }
-uuid = { workspace = true, features = ["v4", "fast-rng", "macro-diagnostics"] }
 
 [dev-dependencies]
 temp-dir = { workspace = true }
diff --git a/src/metric_engine/src/compaction/executor.rs 
b/src/storage/src/compaction/executor.rs
similarity index 100%
rename from src/metric_engine/src/compaction/executor.rs
rename to src/storage/src/compaction/executor.rs
diff --git a/src/metric_engine/src/compaction/mod.rs 
b/src/storage/src/compaction/mod.rs
similarity index 100%
rename from src/metric_engine/src/compaction/mod.rs
rename to src/storage/src/compaction/mod.rs
diff --git a/src/metric_engine/src/compaction/picker.rs 
b/src/storage/src/compaction/picker.rs
similarity index 100%
rename from src/metric_engine/src/compaction/picker.rs
rename to src/storage/src/compaction/picker.rs
diff --git a/src/metric_engine/src/compaction/scheduler.rs 
b/src/storage/src/compaction/scheduler.rs
similarity index 100%
rename from src/metric_engine/src/compaction/scheduler.rs
rename to src/storage/src/compaction/scheduler.rs
diff --git a/src/metric_engine/src/config.rs b/src/storage/src/config.rs
similarity index 100%
rename from src/metric_engine/src/config.rs
rename to src/storage/src/config.rs
diff --git a/src/metric_engine/src/lib.rs b/src/storage/src/lib.rs
similarity index 87%
copy from src/metric_engine/src/lib.rs
copy to src/storage/src/lib.rs
index a993fb74..c16e39e4 100644
--- a/src/metric_engine/src/lib.rs
+++ b/src/storage/src/lib.rs
@@ -20,7 +20,6 @@
 #![feature(duration_constructors)]
 mod compaction;
 pub mod config;
-pub mod error;
 mod macros;
 pub mod manifest;
 pub mod operator;
@@ -31,4 +30,7 @@ pub mod storage;
 mod test_util;
 pub mod types;
 
-pub use error::{AnyhowError, Error, Result};
+// Re-export error types.
+pub type AnyhowError = common::AnyhowError;
+pub type Error = common::Error;
+pub type Result<T> = common::Result<T>;
diff --git a/src/metric_engine/src/macros.rs b/src/storage/src/macros.rs
similarity index 100%
rename from src/metric_engine/src/macros.rs
rename to src/storage/src/macros.rs
diff --git a/src/metric_engine/src/manifest/encoding.rs 
b/src/storage/src/manifest/encoding.rs
similarity index 100%
rename from src/metric_engine/src/manifest/encoding.rs
rename to src/storage/src/manifest/encoding.rs
diff --git a/src/metric_engine/src/manifest/mod.rs 
b/src/storage/src/manifest/mod.rs
similarity index 100%
rename from src/metric_engine/src/manifest/mod.rs
rename to src/storage/src/manifest/mod.rs
diff --git a/src/metric_engine/src/operator.rs b/src/storage/src/operator.rs
similarity index 100%
rename from src/metric_engine/src/operator.rs
rename to src/storage/src/operator.rs
diff --git a/src/metric_engine/src/read.rs b/src/storage/src/read.rs
similarity index 100%
rename from src/metric_engine/src/read.rs
rename to src/storage/src/read.rs
diff --git a/src/metric_engine/src/sst.rs b/src/storage/src/sst.rs
similarity index 100%
rename from src/metric_engine/src/sst.rs
rename to src/storage/src/sst.rs
diff --git a/src/metric_engine/src/storage.rs b/src/storage/src/storage.rs
similarity index 100%
rename from src/metric_engine/src/storage.rs
rename to src/storage/src/storage.rs
diff --git a/src/metric_engine/src/test_util.rs b/src/storage/src/test_util.rs
similarity index 100%
rename from src/metric_engine/src/test_util.rs
rename to src/storage/src/test_util.rs
diff --git a/src/metric_engine/src/types.rs b/src/storage/src/types.rs
similarity index 100%
copy from src/metric_engine/src/types.rs
copy to src/storage/src/types.rs


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to