This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4642a356 fix: remove redundancy length field in wal record (#1576)
4642a356 is described below

commit 4642a356c3df74c838e136304015db9b83d17960
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Oct 17 10:19:18 2024 +0800

    fix: remove redundancy length field in wal record (#1576)
    
    ## Rationale
    `length` field is not required in wal record, it's duplicated with
    value_length.
    
    ## Detailed Changes
    - Remove length from wal record
    - Remove rocksdb-wal from default features
    
    ## Test Plan
    CI and manually do some benchmark with
    [avalanche](https://github.com/prometheus-community/avalanche)
---
 .asf.yaml                                          |   2 +-
 .github/workflows/ci.yml                           |   4 +
 .github/workflows/tsbs.yml                         |   2 +-
 Cargo.lock                                         |   8 +-
 Cargo.toml                                         |   2 +
 docs/example-cluster-0.toml                        |   2 +-
 docs/example-cluster-1.toml                        |   2 +-
 docs/example-standalone-static-routing.toml        |   3 +-
 docs/minimal.toml                                  |   2 +-
 integration_tests/Makefile                         |   2 +-
 .../cases/env/cluster/ddl/partition_table.result   |   6 +-
 .../cases/env/cluster/ddl/partition_table.sql      |   2 +
 .../cases/env/local/ddl/query-plan.result          |   8 +-
 integration_tests/config/horaedb-cluster-0.toml    |   2 +-
 integration_tests/config/horaedb-cluster-1.toml    |   2 +-
 integration_tests/config/shard-based-recovery.toml |   2 +-
 src/horaedb/Cargo.toml                             |   2 +-
 src/wal/Cargo.toml                                 |   2 +-
 src/wal/src/local_storage_impl/record_encoding.rs  | 119 ++++++++-------------
 src/wal/src/local_storage_impl/segment.rs          |   4 +-
 20 files changed, 78 insertions(+), 100 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 5a35a869..0040c452 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -39,7 +39,7 @@ github:
   protected_branches:
     main:
       required_pull_request_reviews:
-        dismiss_stale_reviews: true
+        dismiss_stale_reviews: false
         required_approving_review_count: 1
   protected_tags: []
 
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 2b7bf8ea..a19ad98c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -39,6 +39,10 @@ on:
       - 'Cargo.lock'
       - '.github/workflows/ci.yml'
 
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}
+  cancel-in-progress: true
+
 # Common environment variables
 env:
   RUSTFLAGS: "-C debuginfo=1"
diff --git a/.github/workflows/tsbs.yml b/.github/workflows/tsbs.yml
index a0a88d56..7216edbc 100644
--- a/.github/workflows/tsbs.yml
+++ b/.github/workflows/tsbs.yml
@@ -46,7 +46,7 @@ jobs:
       - name: Setup Build Environment
         run: |
           sudo apt update
-          sudo apt install --yes protobuf-compiler
+          sudo apt install --yes protobuf-compiler liblzma-dev
       - name: Build server
         run: |
           make build
diff --git a/Cargo.lock b/Cargo.lock
index 8f08f6e2..6a8ad08f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3888,7 +3888,7 @@ checksum = 
"348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
 [[package]]
 name = "librocksdb_sys"
 version = "0.1.0"
-source = 
"git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f";
+source = 
"git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac";
 dependencies = [
  "bindgen 0.65.1",
  "bzip2-sys",
@@ -3905,7 +3905,7 @@ dependencies = [
 [[package]]
 name = "libtitan_sys"
 version = "0.0.1"
-source = 
"git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f";
+source = 
"git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac";
 dependencies = [
  "bzip2-sys",
  "cc",
@@ -6307,7 +6307,7 @@ dependencies = [
 [[package]]
 name = "rocksdb"
 version = "0.3.0"
-source = 
"git+https://github.com/tikv/rust-rocksdb.git?rev=f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f#f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f";
+source = 
"git+https://github.com/tikv/rust-rocksdb.git?rev=85e79e52c6ad80b8c547fcb90b3cade64f141fac#85e79e52c6ad80b8c547fcb90b3cade64f141fac";
 dependencies = [
  "libc",
  "librocksdb_sys",
@@ -7042,7 +7042,7 @@ checksum = 
"5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831"
 [[package]]
 name = "snappy-sys"
 version = "0.1.0"
-source = 
"git+https://github.com/busyjay/rust-snappy.git?branch=static-link#8c12738bad811397600455d6982aff754ea2ac44";
+source = 
"git+https://github.com/tikv/rust-snappy.git?branch=static-link#8c12738bad811397600455d6982aff754ea2ac44";
 dependencies = [
  "cmake",
  "libc",
diff --git a/Cargo.toml b/Cargo.toml
index 2bc85e70..abc24036 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -84,6 +84,8 @@ members = [
     "src/wal"
 ]
 
+default-members = ["src/horaedb"]
+
 [workspace.dependencies]
 alloc_tracker = { path = "src/components/alloc_tracker" }
 arrow = { version = "49.0.0", features = ["prettyprint"] }
diff --git a/docs/example-cluster-0.toml b/docs/example-cluster-0.toml
index edc80911..169a9630 100644
--- a/docs/example-cluster-0.toml
+++ b/docs/example-cluster-0.toml
@@ -37,7 +37,7 @@ type = "Local"
 data_dir = "/tmp/horaedb0"
 
 [analytic.wal]
-type = "RocksDB"
+type = "Local"
 data_dir = "/tmp/horaedb0"
 
 [cluster_deployment]
diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml
index 7d312bec..e293f7cc 100644
--- a/docs/example-cluster-1.toml
+++ b/docs/example-cluster-1.toml
@@ -38,7 +38,7 @@ type = "Local"
 data_dir = "/tmp/horaedb1"
 
 [analytic.wal]
-type = "RocksDB"
+type = "Local"
 data_dir = "/tmp/horaedb1"
 
 [cluster_deployment]
diff --git a/docs/example-standalone-static-routing.toml 
b/docs/example-standalone-static-routing.toml
index e519ea5a..7c7d77ca 100644
--- a/docs/example-standalone-static-routing.toml
+++ b/docs/example-standalone-static-routing.toml
@@ -36,7 +36,7 @@ max_replay_tables_per_batch = 1024
 write_group_command_channel_cap = 1024
 
 [analytic.wal]
-type = "RocksDB"
+type = "Local"
 data_dir = "/tmp/horaedb1"
 
 [analytic.storage]
@@ -91,4 +91,3 @@ shards = [ 1 ]
 [limiter]
 write_block_list = ['mytable1']
 read_block_list = ['mytable1']
-
diff --git a/docs/minimal.toml b/docs/minimal.toml
index f66046e7..de11aae7 100644
--- a/docs/minimal.toml
+++ b/docs/minimal.toml
@@ -32,7 +32,7 @@ type = "Local"
 data_dir = "/tmp/horaedb"
 
 [analytic.wal]
-type = "RocksDB"
+type = "Local"
 data_dir = "/tmp/horaedb"
 
 [analytic]
diff --git a/integration_tests/Makefile b/integration_tests/Makefile
index e6ce21bd..fe7fbcdc 100644
--- a/integration_tests/Makefile
+++ b/integration_tests/Makefile
@@ -54,7 +54,7 @@ build-meta:
        ./build_meta.sh
 
 build-horaedb:
-       cd .. && cargo build --bin horaedb-server --features 
wal-table-kv,wal-message-queue,wal-rocksdb,wal-local-storage
+       cd .. && make build-debug
 
 build-test:
        cargo build
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result 
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 87f0708a..980a7bc1 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -100,10 +100,11 @@ 
UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
 -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=xx\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = 
Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: 
Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) 
} }\nscan_table:\n    do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n   
     in [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=xx\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = 
Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: 
Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) 
} }\nscan_table:\n    do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n   
     in [...]
 
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
@@ -111,10 +112,11 @@ String("Plan with 
Metrics"),String("ResolvedPartitionedScan: pushdown_continue:f
 -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
 -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredica [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=xx\n  ScanTable: 
table=__partition_table_t_x, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredica [...]
 
 
 ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql 
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 9b056de7..e1f32de5 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -58,6 +58,7 @@ SELECT * from partition_table_t where name in ("horaedb5", 
"horaedb6", "horaedb7
 -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
 -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
@@ -65,6 +66,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name = 
"ceresdb0";
 -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
 -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
 -- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
+-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
 EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
 
 ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/local/ddl/query-plan.result 
b/integration_tests/cases/env/local/ddl/query-plan.result
index 11b19c14..ee1e27c0 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.result
+++ b/integration_tests/cases/env/local/ddl/query-plan.result
@@ -50,7 +50,7 @@ explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > 
TimestampMillisecond(1695348001000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=1\n        num_ssts=0\n [...]
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t > 
TimestampMillisecond(1695348001000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=1\n        num_ssts=0\n [...]
 
 
 -- This query should have higher priority
@@ -60,7 +60,7 @@ explain analyze select t from `03_dml_select_real_time_range`
 where t >= 1695348001000 and t < 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=High, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= 
TimestampMillisecond(1695348001000, None), t < 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n   [...]
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=High, 
partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[t >= 
TimestampMillisecond(1695348001000, None), t < 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n   [...]
 
 
 -- This query should have higher priority
@@ -70,7 +70,7 @@ explain analyze select name from 
`03_dml_select_real_time_range`
 where t >= 1695348001000 and t < 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[name@0 as name], 
metrics=xx\n  ScanTable: table=03_dml_select_real_time_range, parallelism=8, 
priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), t < 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=true\n    iter_nu 
[...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[name@0 as name], 
metrics=xx\n  ScanTable: table=03_dml_select_real_time_range, parallelism=8, 
priority=High, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), t < 
TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=true\n    iter_nu 
[...]
 
 
 -- This query should not include memtable
@@ -135,7 +135,7 @@ explain analyze select t from `03_append_mode_table`
 where t >= 1695348001000 and name = 'ceresdb';
 
 plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=xx\n  ScanTable: table=03_append_mode_table, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables= [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=xx\n  ScanTable: table=03_append_mode_table, parallelism=8, 
priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables= [...]
 
 
 -- Should just fetch projected columns from SST
diff --git a/integration_tests/config/horaedb-cluster-0.toml 
b/integration_tests/config/horaedb-cluster-0.toml
index da6170bc..3a585bdf 100644
--- a/integration_tests/config/horaedb-cluster-0.toml
+++ b/integration_tests/config/horaedb-cluster-0.toml
@@ -37,7 +37,7 @@ type = "Local"
 data_dir = "/tmp/horaedb0"
 
 [analytic.wal]
-type = "RocksDB"
+type = "Local"
 data_dir = "/tmp/horaedb0"
 
 [cluster_deployment]
diff --git a/integration_tests/config/horaedb-cluster-1.toml 
b/integration_tests/config/horaedb-cluster-1.toml
index f1f71732..e3943a69 100644
--- a/integration_tests/config/horaedb-cluster-1.toml
+++ b/integration_tests/config/horaedb-cluster-1.toml
@@ -38,7 +38,7 @@ type = "Local"
 data_dir = "/tmp/horaedb1"
 
 [analytic.wal]
-type = "RocksDB"
+type = "Local"
 data_dir = "/tmp/horaedb1"
 
 [cluster_deployment]
diff --git a/integration_tests/config/shard-based-recovery.toml 
b/integration_tests/config/shard-based-recovery.toml
index 78b09a8f..92e56f6a 100644
--- a/integration_tests/config/shard-based-recovery.toml
+++ b/integration_tests/config/shard-based-recovery.toml
@@ -34,5 +34,5 @@ type = "Local"
 data_dir = "/tmp/horaedb"
 
 [analytic.wal]
-type = "RocksDB"
+type = "Local"
 data_dir = "/tmp/horaedb"
diff --git a/src/horaedb/Cargo.toml b/src/horaedb/Cargo.toml
index 2abfa49e..ce505105 100644
--- a/src/horaedb/Cargo.toml
+++ b/src/horaedb/Cargo.toml
@@ -31,7 +31,7 @@ workspace = true
 workspace = true
 
 [features]
-default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue", 
"wal-local-storage"]
+default = ["wal-table-kv", "wal-message-queue", "wal-local-storage"]
 wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"]
 wal-message-queue = ["wal/wal-message-queue", 
"analytic_engine/wal-message-queue"]
 wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"]
diff --git a/src/wal/Cargo.toml b/src/wal/Cargo.toml
index 14640164..ce59ae55 100644
--- a/src/wal/Cargo.toml
+++ b/src/wal/Cargo.toml
@@ -32,7 +32,7 @@ workspace = true
 
 [dependencies.rocksdb]
 git = "https://github.com/tikv/rust-rocksdb.git";
-rev = "f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f"
+rev = "85e79e52c6ad80b8c547fcb90b3cade64f141fac"
 features = ["portable"]
 optional = true
 
diff --git a/src/wal/src/local_storage_impl/record_encoding.rs 
b/src/wal/src/local_storage_impl/record_encoding.rs
index e91d3f5a..f7b570c7 100644
--- a/src/wal/src/local_storage_impl/record_encoding.rs
+++ b/src/wal/src/local_storage_impl/record_encoding.rs
@@ -17,6 +17,7 @@
 
 use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut};
 use codec::Encoder;
+use crc32fast::Hasher;
 use generic_error::GenericError;
 use macros::define_result;
 use snafu::{ensure, Backtrace, ResultExt, Snafu};
@@ -26,7 +27,11 @@ pub const NEWEST_RECORD_ENCODING_VERSION: u8 = 
RECORD_ENCODING_V0;
 
 pub const VERSION_SIZE: usize = 1;
 pub const CRC_SIZE: usize = 4;
-pub const RECORD_LENGTH_SIZE: usize = 4;
+pub const TABLE_ID_SIZE: usize = 8;
+pub const SEQUENCE_NUM_SIZE: usize = 8;
+pub const VALUE_LENGTH_SIZE: usize = 4;
+pub const RECORD_HEADER_SIZE: usize =
+    VERSION_SIZE + CRC_SIZE + TABLE_ID_SIZE + SEQUENCE_NUM_SIZE + 
VALUE_LENGTH_SIZE;
 
 #[derive(Debug, Snafu)]
 pub enum Error {
@@ -57,10 +62,10 @@ define_result!(Error);
 /// Record format:
 ///
 /// ```text
-/// 
+---------+--------+--------+------------+--------------+--------------+-------+
-/// | version |  crc   | length |  table id  | sequence num | value length | 
value |
-/// |  (u8)   | (u32)  | (u32)  |   (u64)    |    (u64)     |     (u32)    |   
    |
-/// 
+---------+--------+--------+------------+--------------+--------------+-------+
+/// +---------+--------+------------+--------------+--------------+-------+
+/// | version |  crc   |  table id  | sequence num | value length | value |
+/// |  (u8)   | (u32)  |   (u64)    |    (u64)     |     (u32)    |(bytes)|
+/// +---------+--------+------------+--------------+--------------+-------+
 /// ```
 #[derive(Debug)]
 pub struct Record {
@@ -70,9 +75,6 @@ pub struct Record {
     /// The CRC checksum of the record.
     pub crc: u32,
 
-    /// The length of the record (excluding version, crc and length).
-    pub length: u32,
-
     /// Identifier for tables.
     pub table_id: u64,
 
@@ -87,43 +89,32 @@ pub struct Record {
 }
 
 impl Record {
-    pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Result<Self> 
{
-        let mut record = Record {
+    pub fn new(table_id: u64, sequence_num: u64, value: &[u8]) -> Self {
+        Record {
             version: NEWEST_RECORD_ENCODING_VERSION,
-            crc: 0,
-            length: (8 + 8 + 4 + value.len()) as u32,
+            crc: compute_crc32(table_id, sequence_num, value),
             table_id,
             sequence_num,
             value_length: value.len() as u32,
             value: value.to_vec(),
-        };
-
-        // Calculate CRC
-        let mut buf = Vec::new();
-        buf.try_put_u64(table_id).context(Encoding)?;
-        buf.try_put_u64(sequence_num).context(Encoding)?;
-        buf.try_put_u32(record.value_length).context(Encoding)?;
-        buf.extend_from_slice(value);
-        record.crc = crc32fast::hash(&buf);
-
-        Ok(record)
+        }
     }
 
     // Return the length of the record
     pub fn len(&self) -> usize {
-        VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE + self.length as usize
+        RECORD_HEADER_SIZE + self.value_length as usize
     }
 }
 
 #[derive(Clone, Debug)]
 pub struct RecordEncoding {
-    version: u8,
+    expected_version: u8,
 }
 
 impl RecordEncoding {
     pub fn newest() -> Self {
         Self {
-            version: NEWEST_RECORD_ENCODING_VERSION,
+            expected_version: NEWEST_RECORD_ENCODING_VERSION,
         }
     }
 }
@@ -134,16 +125,15 @@ impl Encoder<Record> for RecordEncoding {
     fn encode<B: BufMut>(&self, buf: &mut B, record: &Record) -> Result<()> {
         // Verify version
         ensure!(
-            record.version == self.version,
+            record.version == self.expected_version,
             Version {
-                expected: self.version,
+                expected: self.expected_version,
                 given: record.version
             }
         );
 
         buf.try_put_u8(record.version).context(Encoding)?;
         buf.try_put_u32(record.crc).context(Encoding)?;
-        buf.try_put_u32(record.length).context(Encoding)?;
         buf.try_put_u64(record.table_id).context(Encoding)?;
         buf.try_put_u64(record.sequence_num).context(Encoding)?;
         buf.try_put_u32(record.value_length).context(Encoding)?;
@@ -160,9 +150,9 @@ impl RecordEncoding {
     pub fn decode<'a>(&'a self, mut buf: &'a [u8]) -> Result<Record> {
         // Ensure that buf is not shorter than the shortest record.
         ensure!(
-            buf.remaining() >= VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+            buf.remaining() >= RECORD_HEADER_SIZE,
             LengthMismatch {
-                expected: VERSION_SIZE + CRC_SIZE + RECORD_LENGTH_SIZE,
+                expected: RECORD_HEADER_SIZE,
                 actual: buf.remaining()
             }
         );
@@ -172,38 +162,22 @@ impl RecordEncoding {
 
         // Verify version
         ensure!(
-            version == self.version,
+            version == self.expected_version,
             Version {
-                expected: self.version,
+                expected: self.expected_version,
                 given: version
             }
         );
 
-        // Read CRC
         let crc = buf.try_get_u32().context(Decoding)?;
-
-        // Read length
-        let length = buf.try_get_u32().context(Decoding)?;
-        ensure!(
-            length > 0,
-            LengthMismatch {
-                expected: 1usize,
-                actual: 0usize
-            }
-        );
-
-        // Ensure the buf is long enough
-        ensure!(
-            buf.remaining() >= length as usize,
-            LengthMismatch {
-                expected: length as usize,
-                actual: buf.remaining()
-            }
-        );
+        let table_id = buf.try_get_u64().context(Decoding)?;
+        let sequence_num = buf.try_get_u64().context(Decoding)?;
+        let value_length = buf.try_get_u32().context(Decoding)?;
+        let mut value = vec![0; value_length as usize];
+        buf.try_copy_to_slice(&mut value).context(Decoding)?;
 
         // Verify CRC
-        let data = &buf[0..length as usize];
-        let computed_crc = crc32fast::hash(data);
+        let computed_crc = compute_crc32(table_id, sequence_num, &value);
         ensure!(
             computed_crc == crc,
             ChecksumMismatch {
@@ -212,23 +186,9 @@ impl RecordEncoding {
             }
         );
 
-        // Read table id
-        let table_id = buf.try_get_u64().context(Decoding)?;
-
-        // Read sequence number
-        let sequence_num = buf.try_get_u64().context(Decoding)?;
-
-        // Read value length
-        let value_length = buf.try_get_u32().context(Decoding)?;
-
-        // Read value
-        let value = buf[0..value_length as usize].to_vec();
-        buf.advance(value_length as usize);
-
         Ok(Record {
             version,
             crc,
-            length,
             table_id,
             sequence_num,
             value_length,
@@ -237,6 +197,18 @@ impl RecordEncoding {
     }
 }
 
+/// The crc32 checksum is calculated over the table_id, sequence_num,
+/// value_length and value.
+// This function does the same with `crc32fast::hash`.
+fn compute_crc32(table_id: u64, seq_num: u64, value: &[u8]) -> u32 {
+    let mut h = Hasher::new();
+    h.update(&table_id.to_le_bytes());
+    h.update(&seq_num.to_le_bytes());
+    h.update(&value.len().to_le_bytes());
+    h.update(value);
+    h.finalize()
+}
+
 #[cfg(test)]
 mod tests {
     use bytes_ext::BytesMut;
@@ -245,11 +217,11 @@ mod tests {
     use crate::local_storage_impl::record_encoding::{Record, RecordEncoding};
 
     #[test]
-    fn test_record_encoding() {
+    fn test_local_wal_record_encoding() {
         let table_id = 1;
         let sequence_num = 2;
         let value = b"test_value";
-        let record = Record::new(table_id, sequence_num, value).unwrap();
+        let record = Record::new(table_id, sequence_num, value);
 
         let encoder = RecordEncoding::newest();
         let mut buf = BytesMut::new();
@@ -260,11 +232,11 @@ mod tests {
     }
 
     #[test]
-    fn test_record_decoding() {
+    fn test_local_wal_record_decoding() {
         let table_id = 1;
         let sequence_num = 2;
         let value = b"test_value";
-        let record = Record::new(table_id, sequence_num, value).unwrap();
+        let record = Record::new(table_id, sequence_num, value);
 
         let encoder = RecordEncoding::newest();
         let mut buf = BytesMut::new();
@@ -274,7 +246,6 @@ mod tests {
 
         assert_eq!(decoded_record.version, record.version);
         assert_eq!(decoded_record.crc, record.crc);
-        assert_eq!(decoded_record.length, record.length);
         assert_eq!(decoded_record.table_id, record.table_id);
         assert_eq!(decoded_record.sequence_num, record.sequence_num);
         assert_eq!(decoded_record.value_length, record.value_length);
diff --git a/src/wal/src/local_storage_impl/segment.rs 
b/src/wal/src/local_storage_impl/segment.rs
index b66701b2..5613021c 100644
--- a/src/wal/src/local_storage_impl/segment.rs
+++ b/src/wal/src/local_storage_impl/segment.rs
@@ -760,9 +760,7 @@ impl Region {
 
         for entry in &batch.entries {
             // Encode the record
-            let record = Record::new(table_id, next_sequence_num, 
&entry.payload)
-                .box_err()
-                .context(Encoding)?;
+            let record = Record::new(table_id, next_sequence_num, 
&entry.payload);
             self.record_encoding
                 .encode(&mut data, &record)
                 .box_err()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to