This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 58db4f5f refactor: use `Read` trait to convert Bytes to Snapshot
(#1612)
58db4f5f is described below
commit 58db4f5f99da6af0f184e47a590b7de44ff7d5e9
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Dec 18 11:55:24 2024 +0800
refactor: use `Read` trait to convert Bytes to Snapshot (#1612)
## Rationale
## Detailed Changes
## Test Plan
CI
---
.github/workflows/ci.yml | 6 --
src/metric_engine/src/manifest/encoding.rs | 148 +++++++++++------------------
2 files changed, 58 insertions(+), 96 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 73775bc6..ea01d87e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -57,9 +57,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- - name: Release Disk Quota
- run: |
- sudo make ensure-disk-quota
- name: Setup Build Environment
run: |
sudo apt update
@@ -81,9 +78,6 @@ jobs:
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- - name: Release Disk Quota
- run: |
- sudo make ensure-disk-quota
- name: Setup Build Environment
run: |
sudo apt update
diff --git a/src/metric_engine/src/manifest/encoding.rs
b/src/metric_engine/src/manifest/encoding.rs
index 72d43d6e..9ec0ecd6 100644
--- a/src/metric_engine/src/manifest/encoding.rs
+++ b/src/metric_engine/src/manifest/encoding.rs
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::io::{Cursor, Write};
+use std::io::{Cursor, Read, Write};
use anyhow::Context;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
-use bytes::Bytes;
-use parquet::data_type::AsBytes;
+use bytes::{Buf, Bytes};
use crate::{
ensure,
@@ -88,7 +87,7 @@ impl From<ManifestUpdate> for pb_types::ManifestUpdate {
/// - The length field (u64) represents the total length of the subsequent
/// records and serves as a straightforward method for verifying their
/// integrity. (length = record_length * record_count)
-#[derive(Debug)]
+#[derive(Debug, PartialEq, Eq)]
pub struct SnapshotHeader {
pub magic: u32,
pub version: u8,
@@ -96,27 +95,33 @@ pub struct SnapshotHeader {
pub length: u64,
}
-impl TryFrom<&[u8]> for SnapshotHeader {
- type Error = Error;
+impl SnapshotHeader {
+ pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8
/*length*/;
+ pub const MAGIC: u32 = 0xCAFE_1234;
- fn try_from(bytes: &[u8]) -> Result<Self> {
- ensure!(
- bytes.len() >= Self::LENGTH,
- "invalid bytes, length: {}",
- bytes.len()
- );
+ pub fn new(length: u64) -> Self {
+ Self {
+ magic: SnapshotHeader::MAGIC,
+ version: SnapshotRecord::VERSION,
+ flag: 0,
+ length,
+ }
+ }
- let mut cursor = Cursor::new(bytes);
- let magic = cursor
+ pub fn try_new<R>(mut reader: R) -> Result<Self>
+ where
+ R: Read,
+ {
+ let magic = reader
.read_u32::<LittleEndian>()
.context("read snapshot header magic")?;
ensure!(
magic == SnapshotHeader::MAGIC,
"invalid bytes to convert to header."
);
- let version = cursor.read_u8().context("read snapshot header
version")?;
- let flag = cursor.read_u8().context("read snapshot header flag")?;
- let length = cursor
+ let version = reader.read_u8().context("read snapshot header
version")?;
+ let flag = reader.read_u8().context("read snapshot header flag")?;
+ let length = reader
.read_u64::<LittleEndian>()
.context("read snapshot header length")?;
Ok(Self {
@@ -126,20 +131,6 @@ impl TryFrom<&[u8]> for SnapshotHeader {
length,
})
}
-}
-
-impl SnapshotHeader {
- pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8
/*length*/;
- pub const MAGIC: u32 = 0xCAFE_1234;
-
- pub fn new(length: u64) -> Self {
- Self {
- magic: SnapshotHeader::MAGIC,
- version: SnapshotRecord::VERSION,
- flag: 0,
- length,
- }
- }
pub fn write_to<W>(&self, mut writer: W) -> Result<()>
where
@@ -167,6 +158,7 @@ impl SnapshotHeader {
/// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) |
/// +---------+-------------------+------------+-----------------+
/// ```
+#[derive(Debug, PartialEq, Eq)]
pub struct SnapshotRecord {
id: u64,
time_range: TimeRange,
@@ -216,30 +208,24 @@ impl From<SstFile> for SnapshotRecord {
}
}
-impl TryFrom<&[u8]> for SnapshotRecord {
- type Error = Error;
-
- fn try_from(value: &[u8]) -> Result<Self> {
- ensure!(
- value.len() >= SnapshotRecord::LENGTH,
- "invalid value len: {}",
- value.len()
- );
-
- let mut cursor = Cursor::new(value);
- let id = cursor
+impl SnapshotRecord {
+ fn try_new<R>(mut reader: R) -> Result<Self>
+ where
+ R: Read,
+ {
+ let id = reader
.read_u64::<LittleEndian>()
.context("read record id")?;
- let start = cursor
+ let start = reader
.read_i64::<LittleEndian>()
.context("read record start")?;
- let end = cursor
+ let end = reader
.read_i64::<LittleEndian>()
.context("read record end")?;
- let size = cursor
+ let size = reader
.read_u32::<LittleEndian>()
.context("read record size")?;
- let num_rows = cursor
+ let num_rows = reader
.read_u32::<LittleEndian>()
.context("read record num_rows")?;
Ok(SnapshotRecord {
@@ -286,21 +272,20 @@ impl TryFrom<Bytes> for Snapshot {
if bytes.is_empty() {
return Ok(Snapshot::default());
}
- let header = SnapshotHeader::try_from(bytes.as_bytes())?;
+ let bytes_len = bytes.len();
+ let mut cursor = Cursor::new(bytes);
+ let header = SnapshotHeader::try_new(&mut cursor)?;
let record_total_length = header.length as usize;
ensure!(
record_total_length > 0
&& record_total_length % SnapshotRecord::LENGTH == 0
- && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
- "create snapshot from bytes failed, header:{header:?},
bytes_length: {}",
- bytes.len()
+ && record_total_length + SnapshotHeader::LENGTH == bytes_len,
+ "create snapshot from bytes failed, header:{header:?},
bytes_length: {bytes_len}",
);
- let mut index = SnapshotHeader::LENGTH;
let mut records = Vec::with_capacity(record_total_length /
SnapshotRecord::LENGTH);
- while index < bytes.len() {
- let record = SnapshotRecord::try_from(&bytes[index..index +
SnapshotRecord::LENGTH])?;
+ while cursor.has_remaining() {
+ let record = SnapshotRecord::try_new(&mut cursor)?;
records.push(record);
- index += SnapshotRecord::LENGTH;
}
Ok(Self { header, records })
@@ -351,23 +336,17 @@ mod tests {
let mut writer = vec.as_mut_slice();
header.write_to(&mut writer).unwrap();
assert!(writer.is_empty());
- let mut cursor = Cursor::new(vec);
+ let cursor = Cursor::new(vec);
+ let header = SnapshotHeader::try_new(cursor).unwrap();
assert_eq!(
- SnapshotHeader::MAGIC,
- cursor.read_u32::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 1, // version
- cursor.read_u8().unwrap()
- );
- assert_eq!(
- 0, // flag
- cursor.read_u8().unwrap()
- );
- assert_eq!(
- 257, // length
- cursor.read_u64::<LittleEndian>().unwrap()
+ SnapshotHeader {
+ magic: SnapshotHeader::MAGIC,
+ version: 1,
+ flag: 0,
+ length: 257
+ },
+ header
);
}
@@ -388,27 +367,16 @@ mod tests {
record.write_to(&mut writer).unwrap();
assert!(writer.is_empty());
- let mut cursor = Cursor::new(vec);
-
- assert_eq!(
- 99, // id
- cursor.read_u64::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 100, // start range
- cursor.read_i64::<LittleEndian>().unwrap()
- );
+ let cursor = Cursor::new(vec);
+ let record = SnapshotRecord::try_new(cursor).unwrap();
assert_eq!(
- 200, // end range
- cursor.read_i64::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 938, // size
- cursor.read_u32::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 100, // num rows
- cursor.read_u32::<LittleEndian>().unwrap()
+ SnapshotRecord {
+ id: 99,
+ time_range: (100..200).into(),
+ size: 938,
+ num_rows: 100
+ },
+ record
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]