This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 4642a356 fix: remove redundancy length field in wal record (#1576)
4642a356 is described below
commit 4642a356c3df74c838e136304015db9b83d17960
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Oct 17 10:19:18 2024 +0800
fix: remove redundancy length field in wal record (#1576)
## Rationale
`length` field is not required in wal record, it's duplicated with
value_length.
## Detailed Changes
- Remove length from wal record
- Remove rocksdb-wal from default features
## Test Plan
CI and manually do some benchmark with
[avalanche](https://github.com/prometheus-community/avalanche)
---
.asf.yaml | 2 +-
.github/workflows/ci.yml | 4 +
.github/workflows/tsbs.yml | 2 +-
Cargo.lock | 8 +-
Cargo.toml | 2 +
docs/example-cluster-0.toml | 2 +-
docs/example-cluster-1.toml | 2 +-
docs/example-standalone-static-routing.toml | 3 +-
docs/minimal.toml | 2 +-
integration_tests/Makefile | 2 +-
.../cases/env/cluster/ddl/partition_table.result | 6 +-
.../cases/env/cluster/ddl/partition_table.sql | 2 +
.../cases/env/local/ddl/query-plan.result | 8 +-
integration_tests/config/horaedb-cluster-0.toml | 2 +-
integration_tests/config/horaedb-cluster-1.toml | 2 +-
integration_tests/config/shard-based-recovery.toml | 2 +-
src/horaedb/Cargo.toml | 2 +-
src/wal/Cargo.toml | 2 +-
src/wal/src/local_storage_impl/record_encoding.rs | 119 ++++++++-------------
src/wal/src/local_storage_impl/segment.rs | 4 +-
20 files changed, 78 insertions(+), 100 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index 5a35a869..0040c452 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -39,7 +39,7 @@ github:
protected_branches:
main:
required_pull_request_reviews:
- dismiss_stale_reviews: true
+ dismiss_stale_reviews: false
required_approving_review_count: 1
protected_tags: []
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 2b7bf8ea..a19ad98c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -39,6 +39,10 @@ on:
- 'Cargo.lock'
- '.github/workflows/ci.yml'
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
# Common environment variables
env:
RUSTFLAGS: "-C debuginfo=1"
diff --git a/.github/workflows/tsbs.yml b/.github/workflows/tsbs.yml
index a0a88d56..7216edbc 100644
--- a/.github/workflows/tsbs.yml
+++ b/.github/workflows/tsbs.yml
@@ -46,7 +46,7 @@ jobs:
- name: Setup Build Environment
run: |
sudo apt update
- sudo apt install --yes protobuf-compiler
+ sudo apt install --yes protobuf-compiler liblzma-dev
- name: Build server
run: |
make build
diff --git a/Cargo.lock b/Cargo.lock
index 8f08f6e2..6a8ad08f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3888,7 +3888,7 @@ checksum =
"348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "librocksdb_sys"
version = "0.1.0"
-source =
"git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f"
+source =
"git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac"
dependencies = [
"bindgen 0.65.1",
"bzip2-sys",
@@ -3905,7 +3905,7 @@ dependencies = [
[[package]]
name = "libtitan_sys"
version = "0.0.1"
-source =
"git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f"
+source =
"git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac"
dependencies = [
"bzip2-sys",
"cc",
@@ -6307,7 +6307,7 @@ dependencies = [
[[package]]
name = "rocksdb"
version = "0.3.0"
-source =
"git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f"
+source =
"git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac"
dependencies = [
"libc",
"librocksdb_sys",
@@ -7042,7 +7042,7 @@ checksum =
"5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831"
[[package]]
name = "snappy-sys"
version = "0.1.0"
-source =
"git+https://github.com/busyjay/rust-snappy.git?branch=static-link#8c12738bad811397600455d6982aff754ea2ac44"
+source =
"git+https://github.com/tikv/rust-snappy.git?branch=static-link#8c12738bad811397600455d6982aff754ea2ac44"
dependencies = [
"cmake",
"libc",
diff --git a/Cargo.toml b/Cargo.toml
index 2bc85e70..abc24036 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -84,6 +84,8 @@ members = [
"src/wal"
]
+default-members = ["src/horaedb"]
+
[workspace.dependencies]
alloc_tracker = { path = "src/components/alloc_tracker" }
arrow = { version = "49.0.0", features = ["prettyprint"] }
diff --git a/docs/example-cluster-0.toml b/docs/example-cluster-0.toml
index edc80911..169a9630 100644
--- a/docs/example-cluster-0.toml
+++ b/docs/example-cluster-0.toml
@@ -37,7 +37,7 @@ type = "Local"
data_dir = "/tmp/horaedb0"
[analytic.wal]
-type = "RocksDB"
+type = "Local"
data_dir = "/tmp/horaedb0"
[cluster_deployment]
diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml
index 7d312bec..e293f7cc 100644
--- a/docs/example-cluster-1.toml
+++ b/docs/example-cluster-1.toml
@@ -38,7 +38,7 @@ type = "Local"
data_dir = "/tmp/horaedb1"
[analytic.wal]
-type = "RocksDB"
+type = "Local"
data_dir = "/tmp/horaedb1"
[cluster_deployment]
diff --git a/docs/example-standalone-static-routing.toml
b/docs/example-standalone-static-routing.toml
index e519ea5a..7c7d77ca 100644
--- a/docs/example-standalone-static-routing.toml
+++ b/docs/example-standalone-static-routing.toml
@@ -36,7 +36,7 @@ max_replay_tables_per_batch = 1024
write_group_command_channel_cap = 1024
[analytic.wal]
-type = "RocksDB"
+type = "Local"
data_dir = "/tmp/horaedb1"
[analytic.storage]
@@ -91,4 +91,3 @@ shards = [ 1 ]
[limiter]
write_block_list = ['mytable1']
read_block_list = ['mytable1']
-
diff --git a/docs/minimal.toml b/docs/minimal.toml
index f66046e7..de11aae7 100644
--- a/docs/minimal.toml
+++ b/docs/minimal.toml
@@ -32,7 +32,7 @@ type = "Local"
data_dir = "/tmp/horaedb"
[analytic.wal]
-type = "RocksDB"
+type = "Local"
data_dir = "/tmp/horaedb"
[analytic]
diff --git a/integration_tests/Makefile b/integration_tests/Makefile
index e6ce21bd..fe7fbcdc 100644
--- a/integration_tests/Makefile
+++ b/integration_tests/Makefile
@@ -54,7 +54,7 @@ build-meta:
./build_meta.sh
build-horaedb:
- cd .. && cargo build --bin horaedb-server --features
wal-table-kv,wal-message-queue,wal-rocksdb,wal-local-storage
+ cd .. && make build-debug
build-test:
cargo build
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 87f0708a..980a7bc1 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -100,10 +100,11 @@
UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable:
table=__partition_table_t_1, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name =
Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start:
Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807)
} }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n
in [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable:
table=__partition_table_t_1, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name =
Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start:
Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807)
} }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n
in [...]
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
@@ -111,10 +112,11 @@ String("Plan with
Metrics"),String("ResolvedPartitionedScan: pushdown_continue:f
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredica [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable:
table=__partition_table_t_x, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredica [...]
ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 9b056de7..e1f32de5 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -58,6 +58,7 @@ SELECT * from partition_table_t where name in ("horaedb5",
"horaedb6", "horaedb7
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
@@ -65,6 +66,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name =
"ceresdb0";
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/local/ddl/query-plan.result
b/integration_tests/cases/env/local/ddl/query-plan.result
index 11b19c14..ee1e27c0 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.result
+++ b/integration_tests/cases/env/local/ddl/query-plan.result
@@ -50,7 +50,7 @@ explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >
TimestampMillisecond(1695348001000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=1\n num_ssts=0\n [...]
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >
TimestampMillisecond(1695348001000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=1\n num_ssts=0\n [...]
-- This query should have higher priority
@@ -60,7 +60,7 @@ explain analyze select t from `03_dml_select_real_time_range`
where t >= 1695348001000 and t < 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=High,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >=
TimestampMillisecond(1695348001000, None), t <
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n [...]
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=High,
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >=
TimestampMillisecond(1695348001000, None), t <
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n [...]
-- This query should have higher priority
@@ -70,7 +70,7 @@ explain analyze select name from
`03_dml_select_real_time_range`
where t >= 1695348001000 and t < 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[name@0 as name],
metrics=xx\n ScanTable: table=03_dml_select_real_time_range, parallelism=8,
priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), t <
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_nu
[...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[name@0 as name],
metrics=xx\n ScanTable: table=03_dml_select_real_time_range, parallelism=8,
priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), t <
TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n iter_nu
[...]
-- This query should not include memtable
@@ -135,7 +135,7 @@ explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';
plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables= [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=xx\n ScanTable: table=03_append_mode_table, parallelism=8,
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables= [...]
-- Should just fetch projected columns from SST
diff --git a/integration_tests/config/horaedb-cluster-0.toml
b/integration_tests/config/horaedb-cluster-0.toml
index da6170bc..3a585bdf 100644
--- a/integration_tests/config/horaedb-cluster-0.toml
+++ b/integration_tests/config/horaedb-cluster-0.toml
@@ -37,7 +37,7 @@ type = "Local"
data_dir = "/tmp/horaedb0"
[analytic.wal]
-type = "RocksDB"
+type = "Local"
data_dir = "/tmp/horaedb0"
[cluster_deployment]
diff --git a/integration_tests/config/horaedb-cluster-1.toml
b/integration_tests/config/horaedb-cluster-1.toml
index f1f71732..e3943a69 100644
--- a/integration_tests/config/horaedb-cluster-1.toml
+++ b/integration_tests/config/horaedb-cluster-1.toml
@@ -38,7 +38,7 @@ type = "Local"
data_dir = "/tmp/horaedb1"
[analytic.wal]
-type = "RocksDB"
+type = "Local"
data_dir = "/tmp/horaedb1"
[cluster_deployment]
diff --git a/integration_tests/config/shard-based-recovery.toml
b/integration_tests/config/shard-based-recovery.toml
index 78b09a8f..92e56f6a 100644
--- a/integration_tests/config/shard-based-recovery.toml
+++ b/integration_tests/config/shard-based-recovery.toml
@@ -34,5 +34,5 @@ type = "Local"
data_dir = "/tmp/horaedb"
[analytic.wal]
-type = "RocksDB"
+type = "Local"
data_dir = "/tmp/horaedb"
diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml
index 2abfa49e..ce505105 100644
--- a/src/horaedb/Cargo.toml
+++ b/src/horaedb/Cargo.toml
@@ -31,7 +31,7 @@ workspace = true
workspace = true
[features]
-default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue",
"wal-local-storage"]
+default = ["wal-table-kv", "wal-message-queue", "wal-local-storage"]
wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"]
wal-message-queue = ["wal/wal-message-queue",
"analytic_engine/wal-message-queue"]
wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"]
diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml
index 14640164..ce59ae55 100644
--- a/src/wal/Cargo.toml
+++ b/src/wal/Cargo.toml
@@ -32,7 +32,7 @@ workspace = true
[dependencies.rocksdb]
git = "https://github.com/tikv/rust-rocksdb.git"
-rev = "f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f"
+rev = "85e79e52c6ad80b8c547fcb90b3cade64f141fac"
features = ["portable"]
optional = true
diff --git a/src/wal/src/local_storage_impl/record_encoding.rs
b/src/wal/src/local_storage_impl/record_encoding.rs
index e91d3f5a..f7b570c7 100644
--- a/src/wal/src/local_storage_impl/record_encoding.rs
+++ b/src/wal/src/local_storage_impl/record_encoding.rs
@@ -17,6 +17,7 @@
use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut};
use codec::Encoder;
+use crc32fast::Hasher;
use generic_error::GenericError;
use macros::define_result;
use snafu::{ensure, Backtrace, ResultExt, Snafu};
@@ -26,7 +27,11 @@ pub const NEWEST_RECORD_ENCODING_VERSION: u8 =
RECORD_ENCODING_V0;
pub const VERSION_SIZE: usize = 1;
pub const CRC_SIZE: usize = 4;
-pub const RECORD_LENGTH_SIZE: usize = 4;
+pub const TABLE_ID_SIZE: usize = 8;
+pub const SEQUENCE_NUM_SIZE: usize = 8;
+pub const VALUE_LENGTH_SIZE: usize = 4;
+pub const RECORD_HEADER_SIZE: usize =
+ VERSION_SIZE + CRC_SIZE + TABLE_ID_SIZE + SEQUENCE_NUM_SIZE +
VALUE_LENGTH_SIZE;
#[derive(Debug, Snafu)]
pub enum Error {
@@ -57,10 +62,10 @@ define_result!(Error);
/// Record format:
///
/// ```text
-///
+---------+--------+--------+------------+--------------+--------------+-------+
-/// | version | crc | length | table id | sequence num | value length |
value |
-/// | (u8) | (u32) | (u32) | (u64) | (u64) | (u32) |
|
-///
+---------+--------+--------+------------+--------------+--------------+-------+
+/// +---------+--------+------------+--------------+--------------+-------+
+/// | version | crc | table id | sequence num | value length | value |
+/// | (u8) | (u32) | (u64) | (u64) | (u32) |(bytes)|
+/// +---------+--------+------------+--------------+--------------+-------+
/// ```
#[derive(Debug)]
pub struct Record {
@@ -70,9 +75,6 @@ pub struct Record {
/// The CRC checksum of the record.
pub crc: u32,
- /// The length of the record (excluding version, crc and length).
- pub length: u32,
-
/// Identifier for tables.
pub table_id: u64,
@@ -87,43 +89,32 @@ pub struct Record {
}
impl Record {
- pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Result<Self>
{
- let mut record = Record {
+ pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Self {
+ Record {
version: NEWEST_RECORD_ENCODING_VERSION,
- crc: 0,
- length: (8 + 8 + 4 + value.len()) as u32,
+ crc: compute_crc32(table_id, sequence_num, value),
table_id,
sequence_num,
value_length: value.len() as u32,
value: value.to_vec(),
- };
-
- // Calculate CRC
- let mut buf = Vec::new();
- buf.try_put_u64(table_id).context(Encoding)?;
- buf.try_put_u64(sequence_num).context(Encoding)?;
- buf.try_put_u32(record.value_length).context(Encoding)?;
- buf.extend_from_slice(value);
- record.crc = crc32fast::hash(&buf);
-
- Ok(record)
+ }
}
// Return the length of the record
pub fn len(&self) -> usize {
- VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE + self.length as usize
+ RECORD_HEADER_SIZE + self.value_length as usize
}
}
#[derive(Clone, Debug)]
pub struct RecordEncoding {
- version: u8,
+ expected_version: u8,
}
impl RecordEncoding {
pub fn newest() -> Self {
Self {
- version: NEWEST_RECORD_ENCODING_VERSION,
+ expected_version: NEWEST_RECORD_ENCODING_VERSION,
}
}
}
@@ -134,16 +125,15 @@ impl Encoder<Record> for RecordEncoding {
fn encode<B: BufMut>(&self, buf: &mut B, record: &Record) -> Result<()> {
// Verify version
ensure!(
- record.version == self.version,
+ record.version == self.expected_version,
Version {
- expected: self.version,
+ expected: self.expected_version,
given: record.version
}
);
buf.try_put_u8(record.version).context(Encoding)?;
buf.try_put_u32(record.crc).context(Encoding)?;
- buf.try_put_u32(record.length).context(Encoding)?;
buf.try_put_u64(record.table_id).context(Encoding)?;
buf.try_put_u64(record.sequence_num).context(Encoding)?;
buf.try_put_u32(record.value_length).context(Encoding)?;
@@ -160,9 +150,9 @@ impl RecordEncoding {
pub fn decode<'a>(&'a self, mut buf: &'a [u8]) -> Result<Record> {
// Ensure that buf is not shorter than the shortest record.
ensure!(
- buf.remaining() >= VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+ buf.remaining() >= RECORD_HEADER_SIZE,
LengthMismatch {
- expected: VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+ expected: RECORD_HEADER_SIZE,
actual: buf.remaining()
}
);
@@ -172,38 +162,22 @@ impl RecordEncoding {
// Verify version
ensure!(
- version == self.version,
+ version == self.expected_version,
Version {
- expected: self.version,
+ expected: self.expected_version,
given: version
}
);
- // Read CRC
let crc = buf.try_get_u32().context(Decoding)?;
-
- // Read length
- let length = buf.try_get_u32().context(Decoding)?;
- ensure!(
- length > 0,
- LengthMismatch {
- expected: 1usize,
- actual: 0usize
- }
- );
-
- // Ensure the buf is long enough
- ensure!(
- buf.remaining() >= length as usize,
- LengthMismatch {
- expected: length as usize,
- actual: buf.remaining()
- }
- );
+ let table_id = buf.try_get_u64().context(Decoding)?;
+ let sequence_num = buf.try_get_u64().context(Decoding)?;
+ let value_length = buf.try_get_u32().context(Decoding)?;
+ let mut value = vec![0; value_length as usize];
+ buf.try_copy_to_slice(&mut value).context(Decoding)?;
// Verify CRC
- let data = &buf[0..length as usize];
- let computed_crc = crc32fast::hash(data);
+ let computed_crc = compute_crc32(table_id, sequence_num, &value);
ensure!(
computed_crc == crc,
ChecksumMismatch {
@@ -212,23 +186,9 @@ impl RecordEncoding {
}
);
- // Read table id
- let table_id = buf.try_get_u64().context(Decoding)?;
-
- // Read sequence number
- let sequence_num = buf.try_get_u64().context(Decoding)?;
-
- // Read value length
- let value_length = buf.try_get_u32().context(Decoding)?;
-
- // Read value
- let value = buf[0..value_length as usize].to_vec();
- buf.advance(value_length as usize);
-
Ok(Record {
version,
crc,
- length,
table_id,
sequence_num,
value_length,
@@ -237,6 +197,18 @@ impl RecordEncoding {
}
}
+/// The crc32 checksum is calculated over the table_id, sequence_num,
+/// value_length and value.
+// This function does the same with `crc32fast::hash`.
+fn compute_crc32(table_id: u64, seq_num: u64, value: &[u8]) -> u32 {
+ let mut h = Hasher::new();
+ h.update(&table_id.to_le_bytes());
+ h.update(&seq_num.to_le_bytes());
+ h.update(&value.len().to_le_bytes());
+ h.update(value);
+ h.finalize()
+}
+
#[cfg(test)]
mod tests {
use bytes_ext::BytesMut;
@@ -245,11 +217,11 @@ mod tests {
use crate::local_storage_impl::record_encoding::{Record, RecordEncoding};
#[test]
- fn test_record_encoding() {
+ fn test_local_wal_record_encoding() {
let table_id = 1;
let sequence_num = 2;
let value = b"test_value";
- let record = Record::new(table_id, sequence_num, value).unwrap();
+ let record = Record::new(table_id, sequence_num, value);
let encoder = RecordEncoding::newest();
let mut buf = BytesMut::new();
@@ -260,11 +232,11 @@ mod tests {
}
#[test]
- fn test_record_decoding() {
+ fn test_local_wal_record_decoding() {
let table_id = 1;
let sequence_num = 2;
let value = b"test_value";
- let record = Record::new(table_id, sequence_num, value).unwrap();
+ let record = Record::new(table_id, sequence_num, value);
let encoder = RecordEncoding::newest();
let mut buf = BytesMut::new();
@@ -274,7 +246,6 @@ mod tests {
assert_eq!(decoded_record.version, record.version);
assert_eq!(decoded_record.crc, record.crc);
- assert_eq!(decoded_record.length, record.length);
assert_eq!(decoded_record.table_id, record.table_id);
assert_eq!(decoded_record.sequence_num, record.sequence_num);
assert_eq!(decoded_record.value_length, record.value_length);
diff --git a/src/wal/src/local_storage_impl/segment.rs
b/src/wal/src/local_storage_impl/segment.rs
index b66701b2..5613021c 100644
--- a/src/wal/src/local_storage_impl/segment.rs
+++ b/src/wal/src/local_storage_impl/segment.rs
@@ -760,9 +760,7 @@ impl Region {
for entry in &batch.entries {
// Encode the record
- let record = Record::new(table_id, next_sequence_num,
&entry.payload)
- .box_err()
- .context(Encoding)?;
+ let record = Record::new(table_id, next_sequence_num,
&entry.payload);
self.record_encoding
.encode(&mut data, &record)
.box_err()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]