zealchen commented on code in PR #1604:
URL: https://github.com/apache/horaedb/pull/1604#discussion_r1881350493
##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -184,6 +203,264 @@ impl Manifest {
}
}
+#[repr(packed)]
+#[derive(Debug)]
+struct SnapshotHeader {
+ pub magic: u32,
+ pub version: u8,
+ pub flag: u8,
+ pub length: u64,
+}
+
+impl SnapshotHeader {
+ // format: | magic(u32) | version(u8) | flags(u8) | length(u64) |
+ // The Magic field (u32) is used to ensure the validity of the data source.
+ // The Flags field (u8) is reserved for future extensibility, such as
enabling
+ // compression or supporting additional features.
+ // The length field (u64) represents the total length of the subsequent
records
+ // and serves as a straightforward method for verifying their integrity.
+ // (length = record_length
+ // * record_count)
+
+ // use #[repr(packed)] to force unalignment and avoid hard code internal
types
+ // here, better solutions?
+ pub const LENGTH: usize = size_of::<SnapshotHeader>();
+ pub const MAGIC: u32 = 0x32A489BF;
+
+ pub fn new(length: u64) -> Self {
+ Self {
+ magic: SnapshotHeader::MAGIC,
+ version: SnapshotRecordV1::VERSION,
+ flag: 0,
+ length,
+ }
+ }
+
+ pub fn try_from_bytes(bytes: &[u8]) -> Result<Self> {
+ if bytes.len() < Self::LENGTH {
+ return Err(anyhow!("invalid bytes, length: {}",
bytes.len()).into());
+ } else {
+ let magic = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
+ let version = bytes[5];
+ let flag = bytes[6];
+ let length = u64::from_le_bytes(bytes[6..14].try_into().unwrap());
+ Ok(Self {
+ magic,
+ version,
+ flag,
+ length,
+ })
+ }
+ }
+
+ pub fn write_to(&self, writter: &mut &mut [u8]) -> Result<()> {
+ if writter.len() < SnapshotHeader::LENGTH {
+ return Err(anyhow!(
+ "writter buf is too small for writing the header, length: {}",
+ writter.len()
+ )
+ .into());
+ }
+ writter.write(self.magic.to_le_bytes().as_slice()).unwrap();
+ writter
+ .write(self.version.to_le_bytes().as_slice())
+ .unwrap();
+ writter.write(self.flag.to_le_bytes().as_slice()).unwrap();
+ writter.write(self.length.to_le_bytes().as_slice()).unwrap();
+ Ok(())
+ }
+}
+
+#[repr(packed)]
+#[derive(Debug)]
+struct SnapshotRecordV1 {
+ id: u64,
+ time_range_start: i64,
+ time_range_end: i64,
+ size: u32,
+ num_rows: u32,
+}
+
+impl SnapshotRecordV1 {
+ // format: | id(u64) | time_range(i64*2)| size(u32) | num_rows(u32)|
+ const LENGTH: usize = size_of::<SnapshotRecordV1>();
+ pub const VERSION: u8 = 1;
+
+ pub fn write_to(&self, writter: &mut &mut [u8]) -> Result<()> {
+ if writter.len() < SnapshotRecordV1::LENGTH {
+ return Err(anyhow!(
+ "writter buf is too small for writing the record, length: {}",
+ writter.len()
+ )
+ .into());
+ }
+ writter.write(self.id.to_le_bytes().as_slice()).unwrap();
+ writter
+ .write(self.time_range_start.to_le_bytes().as_slice())
+ .unwrap();
+ writter
+ .write(self.time_range_end.to_le_bytes().as_slice())
+ .unwrap();
+ writter.write(self.size.to_le_bytes().as_slice()).unwrap();
+ writter
+ .write(self.num_rows.to_le_bytes().as_slice())
+ .unwrap();
+ Ok(())
+ }
+
+ pub fn id(&self) -> u64 {
+ self.id
+ }
+}
+
+impl From<SstFile> for SnapshotRecordV1 {
+ fn from(value: SstFile) -> Self {
+ SnapshotRecordV1 {
+ id: value.id(),
+ time_range_start: value.meta().time_range.start.0,
+ time_range_end: value.meta().time_range.end.0,
+ size: value.meta().size,
+ num_rows: value.meta().num_rows,
+ }
+ }
+}
+
+impl TryFrom<&[u8]> for SnapshotRecordV1 {
+ type Error = Error;
+
+ fn try_from(value: &[u8]) -> Result<Self> {
+ if value.len() < SnapshotRecordV1::LENGTH {
+ return Err(anyhow!("invalid value len: {}", value.len()).into());
+ }
+ let id = u64::from_le_bytes(value[0..8].try_into().unwrap());
+ let time_range_start =
i64::from_le_bytes(value[8..16].try_into().unwrap());
+ let time_range_end =
i64::from_le_bytes(value[16..24].try_into().unwrap());
+ let size = u32::from_le_bytes(value[24..28].try_into().unwrap());
+ let num_rows = u32::from_le_bytes(value[28..32].try_into().unwrap());
+ Ok(SnapshotRecordV1 {
+ id,
+ time_range_start,
+ time_range_end,
+ size,
+ num_rows,
+ })
+ }
+}
+
+impl Into<SstFile> for SnapshotRecordV1 {
+ fn into(self) -> SstFile {
+ let file_meta = FileMeta {
+ max_sequence: self.id(),
+ num_rows: self.num_rows,
+ size: self.size,
+ time_range: TimeRange::new(
+ Timestamp::from(self.time_range_start),
+ Timestamp::from(self.time_range_end),
+ ),
+ };
+ SstFile::new(self.id(), file_meta)
+ }
+}
+
+struct Snapshot {
+ header: SnapshotHeader,
+ inner: Bytes,
+}
+
+impl Default for Snapshot {
+ // create an empty Snapshot
+ fn default() -> Self {
+ let header = SnapshotHeader::new(0);
+ Self {
+ header,
+ inner: Bytes::new(),
+ }
+ }
+}
+
+impl Snapshot {
+ pub fn try_from_bytes(bytes: Bytes) -> Result<Self> {
+ if bytes.is_empty() {
+ return Ok(Snapshot::default());
+ }
+ let header = SnapshotHeader::try_from_bytes(bytes.as_bytes())?;
+ let header_length = header.length as usize;
+ if header_length > 0
+ && (header_length % SnapshotRecordV1::LENGTH != 0
+ || header_length + SnapshotHeader::LENGTH != bytes.len())
+ {
+ return Err(anyhow!(
+ "create snapshot from bytes failed, invalid bytes, header
length = {}, total length: {}",
+ header_length,
+ bytes.len()
+ ).into());
+ }
+ Ok(Self {
+ header,
+ inner: bytes,
+ })
+ }
+
+ pub fn to_sstfiles(&self) -> Result<Vec<SstFile>> {
+ if self.header.length == 0 {
+ return Ok(Vec::new());
+ } else {
+ let buf = self.inner.as_bytes();
+ let mut result: Vec<SstFile> = Vec::new();
+ let mut index = SnapshotHeader::LENGTH;
+ while index < buf.len() {
+ let record =
+ SnapshotRecordV1::try_from(&buf[index..index +
SnapshotRecordV1::LENGTH])?;
+ index += SnapshotRecordV1::LENGTH;
+ result.push(record.into());
+ }
+
+ Ok(result)
+ }
+ }
+
+ pub fn dedup_sstfiles(&self, sstfiles: &mut Vec<SstFile>) -> Result<()> {
+ let buf = self.inner.as_bytes();
+ let mut ids = HashSet::new();
+ let mut index = SnapshotHeader::LENGTH;
+ while index < buf.len() {
+ let record = SnapshotRecordV1::try_from(&buf[index..index +
SnapshotRecordV1::LENGTH])?;
+ index += SnapshotRecordV1::LENGTH;
+ ids.insert(record.id());
+ }
+ sstfiles.retain(|item| !ids.contains(&item.id()));
+
+ Ok(())
+ }
+
+ pub fn merge_sstfiles(&mut self, sstfiles: Vec<SstFile>) {
+ // update header
+ self.header.length += (sstfiles.len() * SnapshotRecordV1::LENGTH) as
u64;
+ // final snapshot
+ let mut snapshot = vec![1u8; SnapshotHeader::LENGTH +
self.header.length as usize];
Review Comment:
It should be 0u8. It's simply a mistake.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]