This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 60b52177 feat: impl write procedure scaffold (#1580)
60b52177 is described below
commit 60b52177fe102ac0a0cbf4805348eb70add2f23c
Author: Jiacai Liu <[email protected]>
AuthorDate: Mon Oct 28 17:38:28 2024 +0800
feat: impl write procedure scaffold (#1580)
## Rationale
Implement write procedure for TimeMergeStorage
## Detailed Changes
- Add basic write implementation.
## Test Plan
CI
---
.github/workflows/metric-engine-ci.yml | 4 +
horaedb/Cargo.lock | 390 +++++++++++++++------
horaedb/Cargo.toml | 6 +-
horaedb/Makefile | 6 +-
horaedb/metric_engine/Cargo.toml | 5 +
horaedb/metric_engine/src/error.rs | 1 +
horaedb/metric_engine/src/lib.rs | 2 +-
horaedb/metric_engine/src/manifest.rs | 116 +++++-
horaedb/metric_engine/src/sst.rs | 76 +++-
horaedb/metric_engine/src/storage.rs | 118 ++++++-
horaedb/metric_engine/src/types.rs | 3 +-
horaedb/{Makefile => pb_types/Cargo.toml} | 25 +-
.../src/sst.rs => pb_types/build.rs} | 7 +-
horaedb/pb_types/protos/sst.proto | 50 +++
.../src/sst.rs => pb_types/src/lib.rs} | 6 +-
horaedb/{Makefile => rust-toolchain.toml} | 14 +-
16 files changed, 676 insertions(+), 153 deletions(-)
diff --git a/.github/workflows/metric-engine-ci.yml
b/.github/workflows/metric-engine-ci.yml
index db3bf5bd..be80ebe8 100644
--- a/.github/workflows/metric-engine-ci.yml
+++ b/.github/workflows/metric-engine-ci.yml
@@ -53,6 +53,10 @@ jobs:
- name: Release Disk Quota
run: |
sudo make ensure-disk-quota
+ - name: Setup Build Environment
+ run: |
+ sudo apt update
+ sudo apt install --yes protobuf-compiler
- name: Install check binaries
run: |
rustup component add clippy
diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index df1d9e0d..632b74b7 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-version = 3
+version = 4
[[package]]
name = "addr2line"
@@ -122,17 +122,17 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "45aef0d9cf9a039bf6cd1acc451b137aca819977b0928dece52bd92811b640ba"
dependencies = [
"arrow-arith 53.0.0",
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-cast 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-cast 53.1.0",
"arrow-csv 53.0.0",
- "arrow-data 53.0.0",
- "arrow-ipc 53.0.0",
+ "arrow-data 53.1.0",
+ "arrow-ipc 53.1.0",
"arrow-json 53.0.0",
"arrow-ord 53.0.0",
"arrow-row 53.0.0",
- "arrow-schema 53.0.0",
- "arrow-select 53.0.0",
+ "arrow-schema 53.1.0",
+ "arrow-select 53.1.0",
"arrow-string 53.0.0",
]
@@ -157,10 +157,10 @@ version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03675e42d1560790f3524800e41403b40d0da1c793fe9528929fde06d8c7649a"
dependencies = [
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
"chrono",
"half",
"num",
@@ -185,14 +185,14 @@ dependencies = [
[[package]]
name = "arrow-array"
-version = "53.0.0"
+version = "53.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cd2bf348cf9f02a5975c5962c7fa6dee107a2009a7b41ac5fb1a027e12dc033f"
+checksum = "7f16835e8599dbbb1659fd869d865254c4cf32c6c2bb60b6942ac9fc36bfa5da"
dependencies = [
"ahash",
- "arrow-buffer 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-buffer 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
"chrono",
"half",
"hashbrown",
@@ -212,9 +212,9 @@ dependencies = [
[[package]]
name = "arrow-buffer"
-version = "53.0.0"
+version = "53.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3092e37715f168976012ce52273c3989b5793b0db5f06cbaa246be25e5f0924d"
+checksum = "1a1f34f0faae77da6b142db61deba2cb6d60167592b178be317b341440acba80"
dependencies = [
"bytes",
"half",
@@ -237,28 +237,28 @@ dependencies = [
"chrono",
"comfy-table",
"half",
- "lexical-core",
+ "lexical-core 0.8.5",
"num",
"ryu",
]
[[package]]
name = "arrow-cast"
-version = "53.0.0"
+version = "53.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ce1018bb710d502f9db06af026ed3561552e493e989a79d0d0f5d9cf267a785"
+checksum = "450e4abb5775bca0740bec0bcf1b1a5ae07eff43bd625661c4436d8e8e4540c4"
dependencies = [
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
- "arrow-select 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
+ "arrow-select 53.1.0",
"atoi",
"base64",
"chrono",
"comfy-table",
"half",
- "lexical-core",
+ "lexical-core 1.0.2",
"num",
"ryu",
]
@@ -278,7 +278,7 @@ dependencies = [
"csv",
"csv-core",
"lazy_static",
- "lexical-core",
+ "lexical-core 0.8.5",
"regex",
]
@@ -288,16 +288,16 @@ version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd178575f45624d045e4ebee714e246a05d9652e41363ee3f57ec18cca97f740"
dependencies = [
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-cast 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-cast 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
"chrono",
"csv",
"csv-core",
"lazy_static",
- "lexical-core",
+ "lexical-core 0.8.5",
"regex",
]
@@ -315,12 +315,12 @@ dependencies = [
[[package]]
name = "arrow-data"
-version = "53.0.0"
+version = "53.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e4ac0c4ee79150afe067dc4857154b3ee9c1cd52b5f40d59a77306d0ed18d65"
+checksum = "2b1e618bbf714c7a9e8d97203c806734f012ff71ae3adc8ad1b075689f540634"
dependencies = [
- "arrow-buffer 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-buffer 53.1.0",
+ "arrow-schema 53.1.0",
"half",
"num",
]
@@ -342,15 +342,15 @@ dependencies = [
[[package]]
name = "arrow-ipc"
-version = "53.0.0"
+version = "53.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bb307482348a1267f91b0912e962cd53440e5de0f7fb24c5f7b10da70b38c94a"
+checksum = "f98e983549259a2b97049af7edfb8f28b8911682040e99a94e4ceb1196bd65c2"
dependencies = [
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-cast 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-cast 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
"flatbuffers",
]
@@ -368,7 +368,7 @@ dependencies = [
"chrono",
"half",
"indexmap",
- "lexical-core",
+ "lexical-core 0.8.5",
"num",
"serde",
"serde_json",
@@ -380,15 +380,15 @@ version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d24805ba326758effdd6f2cbdd482fcfab749544f21b134701add25b33f474e6"
dependencies = [
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-cast 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-cast 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
"chrono",
"half",
"indexmap",
- "lexical-core",
+ "lexical-core 0.8.5",
"num",
"serde",
"serde_json",
@@ -415,11 +415,11 @@ version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644046c479d80ae8ed02a7f1e1399072ea344ca6a7b0e293ab2d5d9ed924aa3b"
dependencies = [
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
- "arrow-select 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
+ "arrow-select 53.1.0",
"half",
"num",
]
@@ -445,10 +445,10 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "a29791f8eb13b340ce35525b723f5f0df17ecb955599e11f65c2a94ab34e2efb"
dependencies = [
"ahash",
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
"half",
]
@@ -460,9 +460,9 @@ checksum =
"9e972cd1ff4a4ccd22f86d3e53e835c2ed92e0eea6a3e8eadb72b4f1ac802cf8"
[[package]]
name = "arrow-schema"
-version = "53.0.0"
+version = "53.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c85320a3a2facf2b2822b57aa9d6d9d55edb8aee0b6b5d3b8df158e503d10858"
+checksum = "fbf0388a18fd7f7f3fe3de01852d30f54ed5182f9004db700fbe3ba843ed2794"
[[package]]
name = "arrow-select"
@@ -480,15 +480,15 @@ dependencies = [
[[package]]
name = "arrow-select"
-version = "53.0.0"
+version = "53.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9cc7e6b582e23855fd1625ce46e51647aa440c20ea2e71b1d748e0839dd73cba"
+checksum = "b83e5723d307a38bf00ecd2972cd078d1339c7fd3eb044f609958a9a24463f3a"
dependencies = [
"ahash",
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
"num",
]
@@ -515,11 +515,11 @@ version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0775b6567c66e56ded19b87a954b6b1beffbdd784ef95a3a2b03f59570c1d230"
dependencies = [
- "arrow-array 53.0.0",
- "arrow-buffer 53.0.0",
- "arrow-data 53.0.0",
- "arrow-schema 53.0.0",
- "arrow-select 53.0.0",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-schema 53.1.0",
+ "arrow-select 53.1.0",
"memchr",
"num",
"regex",
@@ -552,7 +552,7 @@ checksum =
"a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -905,7 +905,7 @@ dependencies = [
"num_cpus",
"object_store 0.10.2",
"parking_lot",
- "parquet",
+ "parquet 52.2.0",
"paste",
"pin-project-lite",
"rand",
@@ -951,7 +951,7 @@ dependencies = [
"libc",
"num_cpus",
"object_store 0.10.2",
- "parquet",
+ "parquet 52.2.0",
"sqlparser",
]
@@ -1334,7 +1334,7 @@ checksum =
"87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -1583,11 +1583,24 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46"
dependencies = [
- "lexical-parse-float",
- "lexical-parse-integer",
- "lexical-util",
- "lexical-write-float",
- "lexical-write-integer",
+ "lexical-parse-float 0.8.5",
+ "lexical-parse-integer 0.8.6",
+ "lexical-util 0.8.5",
+ "lexical-write-float 0.8.5",
+ "lexical-write-integer 0.8.5",
+]
+
+[[package]]
+name = "lexical-core"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458"
+dependencies = [
+ "lexical-parse-float 1.0.2",
+ "lexical-parse-integer 1.0.2",
+ "lexical-util 1.0.3",
+ "lexical-write-float 1.0.2",
+ "lexical-write-integer 1.0.2",
]
[[package]]
@@ -1596,8 +1609,19 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f"
dependencies = [
- "lexical-parse-integer",
- "lexical-util",
+ "lexical-parse-integer 0.8.6",
+ "lexical-util 0.8.5",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-parse-float"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0"
+dependencies = [
+ "lexical-parse-integer 1.0.2",
+ "lexical-util 1.0.3",
"static_assertions",
]
@@ -1607,7 +1631,17 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9"
dependencies = [
- "lexical-util",
+ "lexical-util 0.8.5",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-parse-integer"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61"
+dependencies = [
+ "lexical-util 1.0.3",
"static_assertions",
]
@@ -1620,14 +1654,34 @@ dependencies = [
"static_assertions",
]
+[[package]]
+name = "lexical-util"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0"
+dependencies = [
+ "static_assertions",
+]
+
[[package]]
name = "lexical-write-float"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862"
dependencies = [
- "lexical-util",
- "lexical-write-integer",
+ "lexical-util 0.8.5",
+ "lexical-write-integer 0.8.5",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-write-float"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809"
+dependencies = [
+ "lexical-util 1.0.3",
+ "lexical-write-integer 1.0.2",
"static_assertions",
]
@@ -1637,7 +1691,17 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446"
dependencies = [
- "lexical-util",
+ "lexical-util 0.8.5",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-write-integer"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162"
+dependencies = [
+ "lexical-util 1.0.3",
"static_assertions",
]
@@ -1695,6 +1759,10 @@ dependencies = [
"pkg-config",
]
+[[package]]
+name = "macros"
+version = "2.1.0"
+
[[package]]
name = "md-5"
version = "0.10.6"
@@ -1718,11 +1786,16 @@ dependencies = [
"anyhow",
"arrow 53.0.0",
"async-trait",
+ "bytes",
"datafusion",
"futures",
"itertools 0.3.25",
"lazy_static",
+ "macros",
"object_store 0.11.0",
+ "parquet 53.1.0",
+ "pb_types",
+ "prost",
"thiserror",
"tokio",
]
@@ -1748,6 +1821,12 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "multimap"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
+
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -1973,6 +2052,42 @@ dependencies = [
"zstd-sys",
]
+[[package]]
+name = "parquet"
+version = "53.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e"
+dependencies = [
+ "ahash",
+ "arrow-array 53.1.0",
+ "arrow-buffer 53.1.0",
+ "arrow-cast 53.1.0",
+ "arrow-data 53.1.0",
+ "arrow-ipc 53.1.0",
+ "arrow-schema 53.1.0",
+ "arrow-select 53.1.0",
+ "base64",
+ "brotli",
+ "bytes",
+ "chrono",
+ "flate2",
+ "futures",
+ "half",
+ "hashbrown",
+ "lz4_flex",
+ "num",
+ "num-bigint",
+ "object_store 0.11.0",
+ "paste",
+ "seq-macro",
+ "snap",
+ "thrift",
+ "tokio",
+ "twox-hash",
+ "zstd",
+ "zstd-sys",
+]
+
[[package]]
name = "parse-zoneinfo"
version = "0.3.1"
@@ -1988,6 +2103,14 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
+[[package]]
+name = "pb_types"
+version = "2.0.0"
+dependencies = [
+ "prost",
+ "prost-build",
+]
+
[[package]]
name = "percent-encoding"
version = "2.3.1"
@@ -2069,6 +2192,16 @@ dependencies = [
"zerocopy",
]
+[[package]]
+name = "prettyplease"
+version = "0.2.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "910d41a655dac3b764f1ade94821093d3610248694320cd072303a8eedcf221d"
+dependencies = [
+ "proc-macro2",
+ "syn 2.0.82",
+]
+
[[package]]
name = "proc-macro2"
version = "1.0.86"
@@ -2078,6 +2211,59 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "prost"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f"
+dependencies = [
+ "bytes",
+ "prost-derive",
+]
+
+[[package]]
+name = "prost-build"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
+dependencies = [
+ "bytes",
+ "heck 0.5.0",
+ "itertools 0.13.0",
+ "log",
+ "multimap",
+ "once_cell",
+ "petgraph",
+ "prettyplease",
+ "prost",
+ "prost-types",
+ "regex",
+ "syn 2.0.82",
+ "tempfile",
+]
+
+[[package]]
+name = "prost-derive"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
+dependencies = [
+ "anyhow",
+ "itertools 0.13.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.82",
+]
+
+[[package]]
+name = "prost-types"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
+dependencies = [
+ "prost",
+]
+
[[package]]
name = "quote"
version = "1.0.37"
@@ -2239,7 +2425,7 @@ checksum =
"243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -2350,7 +2536,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -2387,7 +2573,7 @@ checksum =
"01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -2415,7 +2601,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -2437,9 +2623,9 @@ dependencies = [
[[package]]
name = "syn"
-version = "2.0.77"
+version = "2.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed"
+checksum = "83540f837a8afc019423a8edb95b52a8effe46957ee402287f4292fae35be021"
dependencies = [
"proc-macro2",
"quote",
@@ -2476,7 +2662,7 @@ checksum =
"a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -2550,7 +2736,7 @@ checksum =
"693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -2585,7 +2771,7 @@ checksum =
"34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
@@ -2742,7 +2928,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
"wasm-bindgen-shared",
]
@@ -2764,7 +2950,7 @@ checksum =
"afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -2934,7 +3120,7 @@ checksum =
"fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.77",
+ "syn 2.0.82",
]
[[package]]
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index b3048602..ee231cbe 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -23,15 +23,19 @@ license = "Apache-2.0"
[workspace]
resolver = "2"
-members = ["metric_engine", "server"]
+members = ["metric_engine", "pb_types", "server"]
[workspace.dependencies]
anyhow = { version = "1.0" }
metric_engine = { path = "metric_engine" }
thiserror = "1"
+bytes = "1"
datafusion = "41"
parquet = { version = "53" }
object_store = { version = "0.11" }
+macros = { path = "../src/components/macros" }
+pb_types = { path = "pb_types" }
+prost = { version = "0.13" }
arrow = { version = "53", features = ["prettyprint"] }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
diff --git a/horaedb/Makefile b/horaedb/Makefile
index 908a8cb8..72736b0a 100644
--- a/horaedb/Makefile
+++ b/horaedb/Makefile
@@ -19,10 +19,14 @@ SHELL = /bin/bash
clippy:
cargo clippy --all-targets --all-features -- -D warnings \
- -A dead_code -A unused_variables # Remove these once we have a clean
build
+ -A dead_code -A unused_variables -A clippy::unreachable # Remove these
once we have a clean build
sort:
cargo sort --workspace --check
fmt:
cargo fmt -- --check
+
+fix:
+ cargo fmt
+ cargo sort --workspace
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index a29abb8c..d2ea85c8 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -34,10 +34,15 @@ workspace = true
anyhow = { workspace = true }
arrow = { workspace = true }
async-trait = { workspace = true }
+bytes = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
+macros = { workspace = true }
object_store = { workspace = true }
+parquet = { workspace = true, features = ["object_store"] }
+pb_types = { workspace = true }
+prost = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
diff --git a/horaedb/metric_engine/src/error.rs
b/horaedb/metric_engine/src/error.rs
index 08e720f4..35b8e9ae 100644
--- a/horaedb/metric_engine/src/error.rs
+++ b/horaedb/metric_engine/src/error.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+pub use anyhow::Error as AnyhowError;
use thiserror::Error;
#[derive(Error, Debug)]
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index 1a0149db..8a05223c 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -23,4 +23,4 @@ mod sst;
pub mod storage;
pub mod types;
-pub use error::{Error, Result};
+pub use error::{AnyhowError, Error, Result};
diff --git a/horaedb/metric_engine/src/manifest.rs
b/horaedb/metric_engine/src/manifest.rs
index 48391267..aceac3da 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -15,11 +15,119 @@
// specific language governing permissions and limitations
// under the License.
-pub struct Manifest {}
+use anyhow::Context;
+use bytes::Bytes;
+use object_store::{path::Path, PutPayload};
+use prost::Message;
+use tokio::sync::RwLock;
+
+use crate::{
+ sst::{FileId, FileMeta, SstFile},
+ types::ObjectStoreRef,
+ AnyhowError, Error, Result,
+};
+
+pub const PREFIX_PATH: &str = "manifest";
+pub const SNAPSHOT_FILENAME: &str = "snapshot";
+
+pub struct Manifest {
+ path: String,
+ snapshot_path: Path,
+ store: ObjectStoreRef,
+
+ payload: RwLock<Payload>,
+}
+
+pub struct Payload {
+ files: Vec<SstFile>,
+}
+
+impl TryFrom<pb_types::Manifest> for Payload {
+ type Error = Error;
+
+ fn try_from(value: pb_types::Manifest) -> Result<Self> {
+ let files = value
+ .files
+ .into_iter()
+ .map(SstFile::try_from)
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Self { files })
+ }
+}
impl Manifest {
- pub fn new(id: u64) -> Self {
- // Recover the manifest using the id from storage.
- Self {}
+ pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
+ let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
+ let payload = match store.get(&snapshot_path).await {
+ Ok(v) => {
+ let bytes = v
+ .bytes()
+ .await
+ .context("failed to read manifest snapshot")?;
+ let pb_payload = pb_types::Manifest::decode(bytes)
+ .context("failed to decode manifest snapshot")?;
+ Payload::try_from(pb_payload)?
+ }
+ Err(err) => {
+ if err.to_string().contains("not found") {
+ Payload { files: vec![] }
+ } else {
+ let context = format!("Failed to get manifest snapshot,
path:{snapshot_path}");
+ return Err(AnyhowError::new(err).context(context).into());
+ }
+ }
+ };
+
+ Ok(Self {
+ path,
+ snapshot_path,
+ store,
+ payload: RwLock::new(payload),
+ })
+ }
+
+ // TODO: Now this functions is poorly implemented, we concat new_sst to
+ // snapshot, and upload it back in a whole.
+ // In more efficient way, we can create a new diff file, and do compaction
in
+ // background to merge them to `snapshot`.
+ pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
+ let mut payload = self.payload.write().await;
+ let mut tmp_ssts = payload.files.clone();
+ let new_sst = SstFile { id, meta };
+ tmp_ssts.push(new_sst.clone());
+ let pb_manifest = pb_types::Manifest {
+ files: tmp_ssts
+ .into_iter()
+ .map(|f| pb_types::SstFile {
+ id: f.id,
+ meta: Some(pb_types::SstMeta {
+ max_sequence: f.meta.max_sequence,
+ num_rows: f.meta.num_rows,
+ time_range: Some(pb_types::TimeRange {
+ start: f.meta.time_range.start,
+ end: f.meta.time_range.end,
+ }),
+ }),
+ })
+ .collect::<Vec<_>>(),
+ };
+
+ let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
+ pb_manifest
+ .encode(&mut buf)
+ .context("failed to encode manifest")?;
+ let put_payload = PutPayload::from_bytes(Bytes::from(buf));
+
+ // 1. Persist the snapshot
+ self.store
+ .put(&self.snapshot_path, put_payload)
+ .await
+ .context("Failed to update manifest")?;
+
+ // 2. Update cached payload
+ payload.files.push(new_sst);
+
+ Ok(())
}
}
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs
index 37cc8f11..5eb96867 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/metric_engine/src/sst.rs
@@ -15,6 +15,78 @@
// specific language governing permissions and limitations
// under the License.
-pub struct SSTable {
- pub id: u64,
+use std::{
+ sync::{
+ atomic::{AtomicU64, Ordering},
+ LazyLock,
+ },
+ time::SystemTime,
+};
+
+use macros::ensure;
+
+use crate::{types::TimeRange, Error};
+
+pub const PREFIX_PATH: &str = "data";
+
+pub type FileId = u64;
+
+#[derive(Clone, Debug)]
+pub struct SstFile {
+ pub id: FileId,
+ pub meta: FileMeta,
+}
+
+impl TryFrom<pb_types::SstFile> for SstFile {
+ type Error = Error;
+
+ fn try_from(value: pb_types::SstFile) -> Result<Self, Self::Error> {
+ ensure!(value.meta.is_some(), "file meta is missing");
+ let meta = value.meta.unwrap();
+ let meta = meta.try_into()?;
+
+ Ok(Self { id: value.id, meta })
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct FileMeta {
+ pub max_sequence: u64,
+ pub num_rows: u32,
+ pub time_range: TimeRange,
+}
+
+impl TryFrom<pb_types::SstMeta> for FileMeta {
+ type Error = Error;
+
+ fn try_from(value: pb_types::SstMeta) -> Result<Self, Self::Error> {
+ ensure!(value.time_range.is_some(), "time range is missing");
+ let time_range = value.time_range.unwrap();
+
+ Ok(Self {
+ max_sequence: value.max_sequence,
+ num_rows: value.num_rows,
+ time_range: TimeRange {
+ start: time_range.start,
+ end: time_range.end,
+ },
+ })
+ }
+}
+
+// Used for sst file id allocation.
+// This number mustn't go backwards on restarts, otherwise file id
+// collisions are possible. So don't change time on the server
+// between server restarts.
+static NEXT_ID: LazyLock<AtomicU64> = LazyLock::new(|| {
+ AtomicU64::new(
+ SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_nanos() as u64,
+ )
+});
+
+pub fn allocate_id() -> u64 {
+ NEXT_ID.fetch_add(1, Ordering::SeqCst)
}
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index 1cae2bb9..4c5b2667 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -15,19 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::{array::RecordBatch, datatypes::Schema};
+use anyhow::Context;
+use arrow::{
+ array::{Int64Array, RecordBatch},
+ datatypes::SchemaRef,
+};
use async_trait::async_trait;
use datafusion::logical_expr::Expr;
+use macros::ensure;
+use object_store::path::Path;
+use parquet::{
+ arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
+ file::properties::WriterProperties,
+};
use crate::{
manifest::Manifest,
- sst::SSTable,
- types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange},
+ sst::{allocate_id, FileId, FileMeta},
+ types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange, Timestamp},
Result,
};
pub struct WriteRequest {
batch: RecordBatch,
+ props: Option<WriterProperties>,
}
pub struct ScanRequest {
@@ -42,7 +53,7 @@ pub struct CompactRequest {}
/// Time-aware merge storage interface.
#[async_trait]
pub trait TimeMergeStorage {
- fn schema(&self) -> Result<&Schema>;
+ fn schema(&self) -> &SchemaRef;
async fn write(&self, req: WriteRequest) -> Result<()>;
@@ -53,35 +64,106 @@ pub trait TimeMergeStorage {
async fn compact(&self, req: CompactRequest) -> Result<()>;
}
-/// TMStorage implementation using cloud object storage.
+/// `TimeMergeStorage` implementation using cloud object storage.
pub struct CloudObjectStorage {
- name: String,
- id: u64,
+ path: String,
store: ObjectStoreRef,
- sstables: Vec<SSTable>,
+ arrow_schema: SchemaRef,
+ timestamp_index: usize,
manifest: Manifest,
}
+/// It will organize the data in the following way:
+/// ```plaintext
+/// {root_path}/manifest/snapshot
+/// {root_path}/manifest/timestamp1
+/// {root_path}/manifest/timestamp2
+/// {root_path}/manifest/...
+/// {root_path}/data/timestamp_a.sst
+/// {root_path}/data/timestamp_b.sst
+/// {root_path}/data/...
+/// ```
impl CloudObjectStorage {
- pub fn new(name: String, id: u64, store: ObjectStoreRef) -> Self {
- Self {
- name,
- id,
+ pub async fn try_new(
+ root_path: String,
+ store: ObjectStoreRef,
+ arrow_schema: SchemaRef,
+ timestamp_index: usize,
+ ) -> Result<Self> {
+ let manifest_prefix = crate::manifest::PREFIX_PATH;
+ let manifest =
+ Manifest::try_new(format!("{root_path}/{manifest_prefix}"),
store.clone()).await?;
+ Ok(Self {
+ path: root_path,
+ timestamp_index,
store,
- sstables: Vec::new(),
- manifest: Manifest::new(id),
- }
+ arrow_schema,
+ manifest,
+ })
+ }
+
+ fn build_file_path(&self, id: FileId) -> String {
+ let root = &self.path;
+ let prefix = crate::sst::PREFIX_PATH;
+ format!("{root}/{prefix}/{id}")
+ }
+
+ async fn write_batch(&self, req: WriteRequest) -> Result<FileId> {
+ let file_id = allocate_id();
+ let file_path = self.build_file_path(file_id);
+ let object_store_writer =
+ ParquetObjectWriter::new(self.store.clone(),
Path::from(file_path));
+ let mut writer =
+ AsyncArrowWriter::try_new(object_store_writer,
self.schema().clone(), req.props)
+ .context("create arrow writer")?;
+
+ // TODO: sort record batch according to primary key columns.
+ writer
+ .write(&req.batch)
+ .await
+ .context("write arrow batch")?;
+ writer.close().await.context("close arrow writer")?;
+
+ Ok(file_id)
}
}
#[async_trait]
impl TimeMergeStorage for CloudObjectStorage {
- fn schema(&self) -> Result<&Schema> {
- todo!()
+ fn schema(&self) -> &SchemaRef {
+ &self.arrow_schema
}
async fn write(&self, req: WriteRequest) -> Result<()> {
- todo!()
+ ensure!(req.batch.schema_ref().eq(self.schema()), "schema not match");
+
+ let num_rows = req.batch.num_rows();
+ let time_column = req
+ .batch
+ .column(self.timestamp_index)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .context("timestamp column should be int64")?;
+
+ let mut start = Timestamp::MAX;
+ let mut end = Timestamp::MIN;
+ for v in time_column.values() {
+ start = start.min(*v);
+ end = end.max(*v);
+ }
+ let time_range = TimeRange {
+ start,
+ end: end + 1,
+ };
+ let file_id = self.write_batch(req).await?;
+ let file_meta = FileMeta {
+ max_sequence: file_id, // Since file_id in increasing order, we
can use it as sequence.
+ num_rows: num_rows as u32,
+ time_range,
+ };
+ self.manifest.add_file(file_id, file_meta).await?;
+
+ Ok(())
}
async fn scan(&self, req: ScanRequest) ->
Result<SendableRecordBatchStream> {
diff --git a/horaedb/metric_engine/src/types.rs
b/horaedb/metric_engine/src/types.rs
index 08d42fcd..96a4b74a 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -23,7 +23,8 @@ use object_store::ObjectStore;
use crate::error::Result;
-pub type TimeRange = Range<i64>;
+pub type Timestamp = i64;
+pub type TimeRange = Range<Timestamp>;
pub type ObjectStoreRef = Arc<dyn ObjectStore>;
diff --git a/horaedb/Makefile b/horaedb/pb_types/Cargo.toml
similarity index 74%
copy from horaedb/Makefile
copy to horaedb/pb_types/Cargo.toml
index 908a8cb8..e6929fa0 100644
--- a/horaedb/Makefile
+++ b/horaedb/pb_types/Cargo.toml
@@ -15,14 +15,23 @@
# specific language governing permissions and limitations
# under the License.
-SHELL = /bin/bash
+[package]
+name = "pb_types"
-clippy:
- cargo clippy --all-targets --all-features -- -D warnings \
- -A dead_code -A unused_variables # Remove these once we have a clean
build
+[package.license]
+workspace = true
-sort:
- cargo sort --workspace --check
+[package.version]
+workspace = true
-fmt:
- cargo fmt -- --check
+[package.authors]
+workspace = true
+
+[package.edition]
+workspace = true
+
+[dependencies]
+prost = { workspace = true }
+
+[build-dependencies]
+prost-build = { version = "0.13" }
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/pb_types/build.rs
similarity index 86%
copy from horaedb/metric_engine/src/sst.rs
copy to horaedb/pb_types/build.rs
index 37cc8f11..7eb68464 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/pb_types/build.rs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-pub struct SSTable {
- pub id: u64,
+use std::io::Result;
+
+fn main() -> Result<()> {
+ prost_build::compile_protos(&["protos/sst.proto"], &["protos/"])?;
+ Ok(())
}
diff --git a/horaedb/pb_types/protos/sst.proto
b/horaedb/pb_types/protos/sst.proto
new file mode 100644
index 00000000..ce3db301
--- /dev/null
+++ b/horaedb/pb_types/protos/sst.proto
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+syntax = "proto3";
+
+package pb_types.sst;
+
+// Time range of [start, end)
+message TimeRange {
+ // inclusive
+ int64 start = 1;
+ // exclusive
+ int64 end = 2;
+}
+
+message SstMeta {
+ uint64 max_sequence = 1;
+ uint32 num_rows = 2;
+ TimeRange time_range = 3;
+}
+
+message SstFile {
+ uint64 id = 1;
+ SstMeta meta = 2;
+}
+
+message Manifest {
+ repeated SstFile files = 1;
+}
+
+message MetaUpdate {
+ repeated SstFile to_adds = 1;
+ repeated uint64 to_removes = 2;
+}
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/pb_types/src/lib.rs
similarity index 89%
copy from horaedb/metric_engine/src/sst.rs
copy to horaedb/pb_types/src/lib.rs
index 37cc8f11..bfa215b0 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/pb_types/src/lib.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-pub struct SSTable {
- pub id: u64,
+mod pb_types {
+ include!(concat!(env!("OUT_DIR"), "/pb_types.sst.rs"));
}
+
+pub use pb_types::*;
diff --git a/horaedb/Makefile b/horaedb/rust-toolchain.toml
similarity index 77%
copy from horaedb/Makefile
copy to horaedb/rust-toolchain.toml
index 908a8cb8..4c621ca8 100644
--- a/horaedb/Makefile
+++ b/horaedb/rust-toolchain.toml
@@ -15,14 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-SHELL = /bin/bash
-
-clippy:
- cargo clippy --all-targets --all-features -- -D warnings \
- -A dead_code -A unused_variables # Remove these once we have a clean
build
-
-sort:
- cargo sort --workspace --check
-
-fmt:
- cargo fmt -- --check
+[toolchain]
+channel = "nightly-2024-10-15"
+components = [ "rustfmt", "clippy" ]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]