This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new cc3352a7 refactor: move encoding struct into independent file (#1611)
cc3352a7 is described below
commit cc3352a7ae501847e3599d89788d4239d46a1475
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Dec 18 11:21:30 2024 +0800
refactor: move encoding struct into independent file (#1611)
## Rationale
Followup #1610, to make manifest more modular.
## Detailed Changes
## Test Plan
CI
---
src/metric_engine/src/compaction/executor.rs | 8 +-
src/metric_engine/src/manifest/encoding.rs | 348 ++++++++++++++++++++-
.../src/{manifest.rs => manifest/mod.rs} | 333 +-------------------
3 files changed, 354 insertions(+), 335 deletions(-)
diff --git a/src/metric_engine/src/compaction/executor.rs
b/src/metric_engine/src/compaction/executor.rs
index 9f41ec82..c5cf83e2 100644
--- a/src/metric_engine/src/compaction/executor.rs
+++ b/src/metric_engine/src/compaction/executor.rs
@@ -99,7 +99,7 @@ impl Executor {
let inused = self.inner.inused_memory.load(Ordering::Relaxed);
let mem_limit = self.inner.mem_limit;
ensure!(
- inused + task_size > mem_limit,
+ inused + task_size <= mem_limit,
"Compaction memory usage too high, inused:{inused},
task_size:{task_size}, limit:{mem_limit}"
);
@@ -113,7 +113,7 @@ impl Executor {
let task_size = task.input_size();
self.inner
.inused_memory
- .fetch_add(task_size, Ordering::Relaxed);
+ .fetch_sub(task_size, Ordering::Relaxed);
}
pub fn on_failure(&self, task: &Task) {
@@ -137,7 +137,7 @@ impl Executor {
executor: self.clone(),
task,
};
- runnable.run()
+ runnable.spawn()
}
// TODO: Merge input sst files into one new sst file
@@ -257,7 +257,7 @@ pub struct Runnable {
}
impl Runnable {
- fn run(self) {
+ fn spawn(self) {
let rt = self.executor.inner.runtime.clone();
rt.spawn(async move {
if let Err(e) = self.executor.do_compaction(&self.task).await {
diff --git a/src/metric_engine/src/manifest/encoding.rs
b/src/metric_engine/src/manifest/encoding.rs
index 01deb7a5..72d43d6e 100644
--- a/src/metric_engine/src/manifest/encoding.rs
+++ b/src/metric_engine/src/manifest/encoding.rs
@@ -15,8 +15,17 @@
// specific language governing permissions and limitations
// under the License.
+use std::io::{Cursor, Write};
+
+use anyhow::Context;
+use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
+use bytes::Bytes;
+use parquet::data_type::AsBytes;
+
use crate::{
- sst::{FileId, SstFile},
+ ensure,
+ sst::{FileId, FileMeta, SstFile},
+ types::TimeRange,
Error, Result,
};
@@ -66,3 +75,340 @@ impl From<ManifestUpdate> for pb_types::ManifestUpdate {
}
}
}
+
+/// The layout for the header.
+/// ```plaintext
+/// +-------------+--------------+------------+--------------+
+/// | magic(u32) | version(u8) | flag(u8) | length(u32) |
+/// +-------------+--------------+------------+--------------+
+/// ```
+/// - The Magic field (u32) is used to ensure the validity of the data source.
+/// - The Flags field (u8) is reserved for future extensibility, such as
+/// enabling compression or supporting additional features.
+/// - The length field (u64) represents the total length of the subsequent
+/// records and serves as a straightforward method for verifying their
+/// integrity. (length = record_length * record_count)
+#[derive(Debug)]
+pub struct SnapshotHeader {
+ pub magic: u32,
+ pub version: u8,
+ pub flag: u8,
+ pub length: u64,
+}
+
+impl TryFrom<&[u8]> for SnapshotHeader {
+ type Error = Error;
+
+ fn try_from(bytes: &[u8]) -> Result<Self> {
+ ensure!(
+ bytes.len() >= Self::LENGTH,
+ "invalid bytes, length: {}",
+ bytes.len()
+ );
+
+ let mut cursor = Cursor::new(bytes);
+ let magic = cursor
+ .read_u32::<LittleEndian>()
+ .context("read snapshot header magic")?;
+ ensure!(
+ magic == SnapshotHeader::MAGIC,
+ "invalid bytes to convert to header."
+ );
+ let version = cursor.read_u8().context("read snapshot header
version")?;
+ let flag = cursor.read_u8().context("read snapshot header flag")?;
+ let length = cursor
+ .read_u64::<LittleEndian>()
+ .context("read snapshot header length")?;
+ Ok(Self {
+ magic,
+ version,
+ flag,
+ length,
+ })
+ }
+}
+
+impl SnapshotHeader {
+ pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8
/*length*/;
+ pub const MAGIC: u32 = 0xCAFE_1234;
+
+ pub fn new(length: u64) -> Self {
+ Self {
+ magic: SnapshotHeader::MAGIC,
+ version: SnapshotRecord::VERSION,
+ flag: 0,
+ length,
+ }
+ }
+
+ pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+ where
+ W: Write,
+ {
+ writer
+ .write_u32::<LittleEndian>(self.magic)
+ .context("write shall not fail.")?;
+ writer
+ .write_u8(self.version)
+ .context("write shall not fail.")?;
+ writer
+ .write_u8(self.flag)
+ .context("write shall not fail.")?;
+ writer
+ .write_u64::<LittleEndian>(self.length)
+ .context("write shall not fail.")?;
+ Ok(())
+ }
+}
+
+/// The layout for manifest Record:
+/// ```plaintext
+/// +---------+-------------------+------------+-----------------+
+/// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) |
+/// +---------+-------------------+------------+-----------------+
+/// ```
+pub struct SnapshotRecord {
+ id: u64,
+ time_range: TimeRange,
+ size: u32,
+ num_rows: u32,
+}
+
+impl SnapshotRecord {
+ const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num
rows*/;
+ pub const VERSION: u8 = 1;
+
+ pub fn write_to<W>(&self, mut writer: W) -> Result<()>
+ where
+ W: Write,
+ {
+ writer
+ .write_u64::<LittleEndian>(self.id)
+ .context("write shall not fail.")?;
+ writer
+ .write_i64::<LittleEndian>(*self.time_range.start)
+ .context("write shall not fail.")?;
+ writer
+ .write_i64::<LittleEndian>(*self.time_range.end)
+ .context("write shall not fail.")?;
+ writer
+ .write_u32::<LittleEndian>(self.size)
+ .context("write shall not fail.")?;
+ writer
+ .write_u32::<LittleEndian>(self.num_rows)
+ .context("write shall not fail.")?;
+ Ok(())
+ }
+
+ pub fn id(&self) -> u64 {
+ self.id
+ }
+}
+
+impl From<SstFile> for SnapshotRecord {
+ fn from(value: SstFile) -> Self {
+ SnapshotRecord {
+ id: value.id(),
+ time_range: value.meta().time_range.clone(),
+ size: value.meta().size,
+ num_rows: value.meta().num_rows,
+ }
+ }
+}
+
+impl TryFrom<&[u8]> for SnapshotRecord {
+ type Error = Error;
+
+ fn try_from(value: &[u8]) -> Result<Self> {
+ ensure!(
+ value.len() >= SnapshotRecord::LENGTH,
+ "invalid value len: {}",
+ value.len()
+ );
+
+ let mut cursor = Cursor::new(value);
+ let id = cursor
+ .read_u64::<LittleEndian>()
+ .context("read record id")?;
+ let start = cursor
+ .read_i64::<LittleEndian>()
+ .context("read record start")?;
+ let end = cursor
+ .read_i64::<LittleEndian>()
+ .context("read record end")?;
+ let size = cursor
+ .read_u32::<LittleEndian>()
+ .context("read record size")?;
+ let num_rows = cursor
+ .read_u32::<LittleEndian>()
+ .context("read record num_rows")?;
+ Ok(SnapshotRecord {
+ id,
+ time_range: (start..end).into(),
+ size,
+ num_rows,
+ })
+ }
+}
+
+impl From<SnapshotRecord> for SstFile {
+ fn from(record: SnapshotRecord) -> Self {
+ let file_meta = FileMeta {
+ max_sequence: record.id(),
+ num_rows: record.num_rows,
+ size: record.size,
+ time_range: record.time_range.clone(),
+ };
+ SstFile::new(record.id(), file_meta)
+ }
+}
+
+pub struct Snapshot {
+ header: SnapshotHeader,
+ records: Vec<SnapshotRecord>,
+}
+
+impl Default for Snapshot {
+ // create an empty Snapshot
+ fn default() -> Self {
+ let header = SnapshotHeader::new(0);
+ Self {
+ header,
+ records: Vec::new(),
+ }
+ }
+}
+
+impl TryFrom<Bytes> for Snapshot {
+ type Error = Error;
+
+ fn try_from(bytes: Bytes) -> Result<Self> {
+ if bytes.is_empty() {
+ return Ok(Snapshot::default());
+ }
+ let header = SnapshotHeader::try_from(bytes.as_bytes())?;
+ let record_total_length = header.length as usize;
+ ensure!(
+ record_total_length > 0
+ && record_total_length % SnapshotRecord::LENGTH == 0
+ && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
+ "create snapshot from bytes failed, header:{header:?},
bytes_length: {}",
+ bytes.len()
+ );
+ let mut index = SnapshotHeader::LENGTH;
+ let mut records = Vec::with_capacity(record_total_length /
SnapshotRecord::LENGTH);
+ while index < bytes.len() {
+ let record = SnapshotRecord::try_from(&bytes[index..index +
SnapshotRecord::LENGTH])?;
+ records.push(record);
+ index += SnapshotRecord::LENGTH;
+ }
+
+ Ok(Self { header, records })
+ }
+}
+
+impl Snapshot {
+ pub fn into_ssts(self) -> Vec<SstFile> {
+ if self.header.length == 0 {
+ Vec::new()
+ } else {
+ self.records.into_iter().map(|r| r.into()).collect()
+ }
+ }
+
+ // TODO: Ensure no files duplicated
+ // https://github.com/apache/horaedb/issues/1608
+ pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> {
+ self.records
+ .extend(update.to_adds.into_iter().map(SnapshotRecord::from));
+ self.records
+ .retain(|record| !update.to_deletes.contains(&record.id));
+
+ self.header.length = (self.records.len() * SnapshotRecord::LENGTH) as
u64;
+ Ok(())
+ }
+
+ pub fn into_bytes(self) -> Result<Bytes> {
+ let buf = Vec::with_capacity(self.header.length as usize +
SnapshotHeader::LENGTH);
+ let mut cursor = Cursor::new(buf);
+
+ self.header.write_to(&mut cursor)?;
+ for record in self.records {
+ record.write_to(&mut cursor)?;
+ }
+ Ok(Bytes::from(cursor.into_inner()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_snapshot_header() {
+ let header = SnapshotHeader::new(257);
+ let mut vec = vec![0u8; SnapshotHeader::LENGTH];
+ let mut writer = vec.as_mut_slice();
+ header.write_to(&mut writer).unwrap();
+ assert!(writer.is_empty());
+ let mut cursor = Cursor::new(vec);
+
+ assert_eq!(
+ SnapshotHeader::MAGIC,
+ cursor.read_u32::<LittleEndian>().unwrap()
+ );
+ assert_eq!(
+ 1, // version
+ cursor.read_u8().unwrap()
+ );
+ assert_eq!(
+ 0, // flag
+ cursor.read_u8().unwrap()
+ );
+ assert_eq!(
+ 257, // length
+ cursor.read_u64::<LittleEndian>().unwrap()
+ );
+ }
+
+ #[test]
+ fn test_snapshot_record() {
+ let sstfile = SstFile::new(
+ 99,
+ FileMeta {
+ max_sequence: 99,
+ num_rows: 100,
+ size: 938,
+ time_range: (100..200).into(),
+ },
+ );
+ let record: SnapshotRecord = sstfile.into();
+ let mut vec: Vec<u8> = vec![0u8; SnapshotRecord::LENGTH];
+ let mut writer = vec.as_mut_slice();
+ record.write_to(&mut writer).unwrap();
+
+ assert!(writer.is_empty());
+ let mut cursor = Cursor::new(vec);
+
+ assert_eq!(
+ 99, // id
+ cursor.read_u64::<LittleEndian>().unwrap()
+ );
+ assert_eq!(
+ 100, // start range
+ cursor.read_i64::<LittleEndian>().unwrap()
+ );
+ assert_eq!(
+ 200, // end range
+ cursor.read_i64::<LittleEndian>().unwrap()
+ );
+ assert_eq!(
+ 938, // size
+ cursor.read_u32::<LittleEndian>().unwrap()
+ );
+ assert_eq!(
+ 100, // num rows
+ cursor.read_u32::<LittleEndian>().unwrap()
+ );
+ }
+}
diff --git a/src/metric_engine/src/manifest.rs
b/src/metric_engine/src/manifest/mod.rs
similarity index 59%
rename from src/metric_engine/src/manifest.rs
rename to src/metric_engine/src/manifest/mod.rs
index 17b188d6..ffc45567 100644
--- a/src/metric_engine/src/manifest.rs
+++ b/src/metric_engine/src/manifest/mod.rs
@@ -17,7 +17,6 @@
mod encoding;
use std::{
- io::{Cursor, Write},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@@ -27,12 +26,11 @@ use std::{
use anyhow::Context;
use async_scoped::TokioScope;
-use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::Bytes;
pub use encoding::ManifestUpdate;
+use encoding::Snapshot;
use futures::{StreamExt, TryStreamExt};
use object_store::{path::Path, PutPayload};
-use parquet::data_type::AsBytes;
use prost::Message;
use tokio::sync::{
mpsc::{self, Receiver, Sender},
@@ -42,10 +40,9 @@ use tracing::error;
use uuid::Uuid;
use crate::{
- ensure,
sst::{FileId, FileMeta, SstFile},
types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange},
- AnyhowError, Error, Result,
+ AnyhowError, Result,
};
pub const PREFIX_PATH: &str = "manifest";
@@ -152,257 +149,6 @@ impl Manifest {
}
}
-/// The layout for the header.
-/// ```plaintext
-/// +-------------+--------------+------------+--------------+
-/// | magic(u32) | version(u8) | flag(u8) | length(u32) |
-/// +-------------+--------------+------------+--------------+
-/// ```
-/// - The Magic field (u32) is used to ensure the validity of the data source.
-/// - The Flags field (u8) is reserved for future extensibility, such as
-/// enabling compression or supporting additional features.
-/// - The length field (u64) represents the total length of the subsequent
-/// records and serves as a straightforward method for verifying their
-/// integrity. (length = record_length * record_count)
-#[derive(Debug)]
-struct SnapshotHeader {
- pub magic: u32,
- pub version: u8,
- pub flag: u8,
- pub length: u64,
-}
-
-impl TryFrom<&[u8]> for SnapshotHeader {
- type Error = Error;
-
- fn try_from(bytes: &[u8]) -> Result<Self> {
- ensure!(
- bytes.len() >= Self::LENGTH,
- "invalid bytes, length: {}",
- bytes.len()
- );
-
- let mut cursor = Cursor::new(bytes);
- let magic = cursor.read_u32::<LittleEndian>().unwrap();
- ensure!(
- magic == SnapshotHeader::MAGIC,
- "invalid bytes to convert to header."
- );
- let version = cursor.read_u8().unwrap();
- let flag = cursor.read_u8().unwrap();
- let length = cursor.read_u64::<LittleEndian>().unwrap();
- Ok(Self {
- magic,
- version,
- flag,
- length,
- })
- }
-}
-
-impl SnapshotHeader {
- pub const LENGTH: usize = 4 /*magic*/ + 1 /*version*/ + 1 /*flag*/ + 8
/*length*/;
- pub const MAGIC: u32 = 0xCAFE_6666;
-
- pub fn new(length: u64) -> Self {
- Self {
- magic: SnapshotHeader::MAGIC,
- version: SnapshotRecordV1::VERSION,
- flag: 0,
- length,
- }
- }
-
- pub fn write_to<W>(&self, mut writer: W) -> Result<()>
- where
- W: Write,
- {
- writer
- .write_u32::<LittleEndian>(self.magic)
- .context("write shall not fail.")?;
- writer
- .write_u8(self.version)
- .context("write shall not fail.")?;
- writer
- .write_u8(self.flag)
- .context("write shall not fail.")?;
- writer
- .write_u64::<LittleEndian>(self.length)
- .context("write shall not fail.")?;
- Ok(())
- }
-}
-
-/// The layout for manifest Record:
-/// ```plaintext
-/// +---------+-------------------+------------+-----------------+
-/// | id(u64) | time_range(i64*2) | size(u32) | num_rows(u32) |
-/// +---------+-------------------+------------+-----------------+
-/// ```
-struct SnapshotRecordV1 {
- id: u64,
- time_range: TimeRange,
- size: u32,
- num_rows: u32,
-}
-
-impl SnapshotRecordV1 {
- const LENGTH: usize = 8 /*id*/+ 16 /*time range*/ + 4 /*size*/ + 4 /*num
rows*/;
- pub const VERSION: u8 = 1;
-
- pub fn write_to<W>(&self, mut writer: W) -> Result<()>
- where
- W: Write,
- {
- writer
- .write_u64::<LittleEndian>(self.id)
- .context("write shall not fail.")?;
- writer
- .write_i64::<LittleEndian>(*self.time_range.start)
- .context("write shall not fail.")?;
- writer
- .write_i64::<LittleEndian>(*self.time_range.end)
- .context("write shall not fail.")?;
- writer
- .write_u32::<LittleEndian>(self.size)
- .context("write shall not fail.")?;
- writer
- .write_u32::<LittleEndian>(self.num_rows)
- .context("write shall not fail.")?;
- Ok(())
- }
-
- pub fn id(&self) -> u64 {
- self.id
- }
-}
-
-impl From<SstFile> for SnapshotRecordV1 {
- fn from(value: SstFile) -> Self {
- SnapshotRecordV1 {
- id: value.id(),
- time_range: value.meta().time_range.clone(),
- size: value.meta().size,
- num_rows: value.meta().num_rows,
- }
- }
-}
-
-impl TryFrom<&[u8]> for SnapshotRecordV1 {
- type Error = Error;
-
- fn try_from(value: &[u8]) -> Result<Self> {
- ensure!(
- value.len() >= SnapshotRecordV1::LENGTH,
- "invalid value len: {}",
- value.len()
- );
-
- let mut cursor = Cursor::new(value);
- let id = cursor.read_u64::<LittleEndian>().unwrap();
- let start = cursor.read_i64::<LittleEndian>().unwrap();
- let end = cursor.read_i64::<LittleEndian>().unwrap();
- let size = cursor.read_u32::<LittleEndian>().unwrap();
- let num_rows = cursor.read_u32::<LittleEndian>().unwrap();
- Ok(SnapshotRecordV1 {
- id,
- time_range: (start..end).into(),
- size,
- num_rows,
- })
- }
-}
-
-impl From<SnapshotRecordV1> for SstFile {
- fn from(record: SnapshotRecordV1) -> Self {
- let file_meta = FileMeta {
- max_sequence: record.id(),
- num_rows: record.num_rows,
- size: record.size,
- time_range: record.time_range.clone(),
- };
- SstFile::new(record.id(), file_meta)
- }
-}
-
-struct Snapshot {
- header: SnapshotHeader,
- records: Vec<SnapshotRecordV1>,
-}
-
-impl Default for Snapshot {
- // create an empty Snapshot
- fn default() -> Self {
- let header = SnapshotHeader::new(0);
- Self {
- header,
- records: Vec::new(),
- }
- }
-}
-
-impl TryFrom<Bytes> for Snapshot {
- type Error = Error;
-
- fn try_from(bytes: Bytes) -> Result<Self> {
- if bytes.is_empty() {
- return Ok(Snapshot::default());
- }
- let header = SnapshotHeader::try_from(bytes.as_bytes())?;
- let record_total_length = header.length as usize;
- ensure!(
- record_total_length > 0
- && record_total_length % SnapshotRecordV1::LENGTH == 0
- && record_total_length + SnapshotHeader::LENGTH == bytes.len(),
- "create snapshot from bytes failed, header:{header:?},
bytes_length: {}",
- bytes.len()
- );
- let mut index = SnapshotHeader::LENGTH;
- let mut records = Vec::with_capacity(record_total_length /
SnapshotRecordV1::LENGTH);
- while index < bytes.len() {
- let record =
- SnapshotRecordV1::try_from(&bytes[index..index +
SnapshotRecordV1::LENGTH])?;
- records.push(record);
- index += SnapshotRecordV1::LENGTH;
- }
-
- Ok(Self { header, records })
- }
-}
-
-impl Snapshot {
- pub fn into_ssts(self) -> Vec<SstFile> {
- if self.header.length == 0 {
- Vec::new()
- } else {
- self.records.into_iter().map(|r| r.into()).collect()
- }
- }
-
- // TODO: Ensure no files duplicated
- // https://github.com/apache/horaedb/issues/1608
- pub fn merge_update(&mut self, update: ManifestUpdate) -> Result<()> {
- self.records
- .extend(update.to_adds.into_iter().map(SnapshotRecordV1::from));
- self.records
- .retain(|record| !update.to_deletes.contains(&record.id));
-
- self.header.length = (self.records.len() * SnapshotRecordV1::LENGTH)
as u64;
- Ok(())
- }
-
- pub fn into_bytes(self) -> Result<Bytes> {
- let buf = Vec::with_capacity(self.header.length as usize +
SnapshotHeader::LENGTH);
- let mut cursor = Cursor::new(buf);
-
- self.header.write_to(&mut cursor)?;
- for record in self.records {
- record.write_to(&mut cursor).unwrap();
- }
- Ok(Bytes::from(cursor.into_inner()))
- }
-}
-
enum MergeType {
Hard,
Soft,
@@ -605,7 +351,6 @@ async fn list_delta_paths(store: &ObjectStoreRef,
delta_dir: &Path) -> Result<Ve
mod tests {
use std::sync::Arc;
- use itertools::Itertools;
use object_store::local::LocalFileSystem;
use tokio::time::sleep;
@@ -705,11 +450,7 @@ mod tests {
let mut mem_ssts = manifest.ssts.read().await.clone();
let snapshot = read_snapshot(&store,
&snapshot_path).await.unwrap();
- let mut ssts = snapshot
- .records
- .into_iter()
- .map(SstFile::from)
- .collect_vec();
+ let mut ssts = snapshot.into_ssts();
mem_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
@@ -719,72 +460,4 @@ mod tests {
assert!(delta_paths.is_empty());
})
}
-
- #[test]
- fn test_snapshot_header() {
- let header = SnapshotHeader::new(257);
- let mut vec = vec![0u8; SnapshotHeader::LENGTH];
- let mut writer = vec.as_mut_slice();
- header.write_to(&mut writer).unwrap();
- assert!(writer.is_empty());
- let mut cursor = Cursor::new(vec);
-
- assert_eq!(
- SnapshotHeader::MAGIC,
- cursor.read_u32::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 1, // version
- cursor.read_u8().unwrap()
- );
- assert_eq!(
- 0, // flag
- cursor.read_u8().unwrap()
- );
- assert_eq!(
- 257, // length
- cursor.read_u64::<LittleEndian>().unwrap()
- );
- }
-
- #[test]
- fn test_snapshot_record() {
- let sstfile = SstFile::new(
- 99,
- FileMeta {
- max_sequence: 99,
- num_rows: 100,
- size: 938,
- time_range: (100..200).into(),
- },
- );
- let record: SnapshotRecordV1 = sstfile.into();
- let mut vec: Vec<u8> = vec![0u8; SnapshotRecordV1::LENGTH];
- let mut writer = vec.as_mut_slice();
- record.write_to(&mut writer).unwrap();
-
- assert!(writer.is_empty());
- let mut cursor = Cursor::new(vec);
-
- assert_eq!(
- 99, // id
- cursor.read_u64::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 100, // start range
- cursor.read_i64::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 200, // end range
- cursor.read_i64::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 938, // size
- cursor.read_u32::<LittleEndian>().unwrap()
- );
- assert_eq!(
- 100, // num rows
- cursor.read_u32::<LittleEndian>().unwrap()
- );
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]