This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 76a21bcb4 feat(storage): support for sideloading SSTs (#2845)
76a21bcb4 is described below
commit 76a21bcb4b8e7a625fa69a7e67fe915830008190
Author: Luigi Tagliamonte <[email protected]>
AuthorDate: Thu Apr 17 04:45:12 2025 -0700
feat(storage): support for sideloading SSTs (#2845)
Co-authored-by: Aleks Lozovyuk <[email protected]>
Co-authored-by: hulk <[email protected]>
Co-authored-by: Twice <[email protected]>
---
.github/workflows/kvrocks.yaml | 19 +-
src/commands/cmd_server.cc | 52 ++-
src/server/server.h | 7 +
src/storage/storage.cc | 82 +++++
src/storage/storage.h | 5 +
tests/gocase/go.mod | 1 +
tests/gocase/go.sum | 2 +
tests/gocase/unit/sst/sst_load_test.go | 609 +++++++++++++++++++++++++++++++++
x.py | 58 +++-
9 files changed, 827 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml
index 33840a2c1..de4e26752 100644
--- a/.github/workflows/kvrocks.yaml
+++ b/.github/workflows/kvrocks.yaml
@@ -147,6 +147,7 @@ jobs:
os: ubuntu-22.04
compiler: gcc
sonarcloud: -DCMAKE_CXX_FLAGS=--coverage
+ with_coverage: "true"
- name: Ubuntu Clang
os: ubuntu-22.04
compiler: clang
@@ -162,11 +163,13 @@ jobs:
without_jemalloc: -DDISABLE_JEMALLOC=ON
with_sanitizer: -DENABLE_ASAN=ON
compiler: gcc
+ ignore_when_asan: -tags="ignore_when_asan"
- name: Ubuntu Clang ASan
os: ubuntu-22.04
with_sanitizer: -DENABLE_ASAN=ON
without_jemalloc: -DDISABLE_JEMALLOC=ON
compiler: clang
+ ignore_when_asan: -tags="ignore_when_asan"
- name: Ubuntu GCC TSan
os: ubuntu-22.04
without_jemalloc: -DDISABLE_JEMALLOC=ON
@@ -184,6 +187,7 @@ jobs:
with_sanitizer: -DENABLE_UBSAN=ON
without_jemalloc: -DDISABLE_JEMALLOC=ON
compiler: clang
+ ignore_when_ubsan: -tags="ignore_when_ubsan"
- name: Ubuntu GCC Ninja
os: ubuntu-22.04
with_ninja: --ninja
@@ -315,7 +319,11 @@ jobs:
export LSAN_OPTIONS="suppressions=$(realpath
./tests/lsan-suppressions)"
export TSAN_OPTIONS="suppressions=$(realpath
./tests/tsan-suppressions)"
export PATH=$PATH:$HOME/local/bin/
+ export CGO_ENABLED=1
GOCASE_RUN_ARGS=""
+ if [[ -n "${{ matrix.with_coverage }}" ]]; then
+ export WITH_COVERAGE="true"
+ fi
if [[ -n "${{ matrix.with_openssl }}" ]] && [[ "${{ matrix.os }}" ==
ubuntu* ]]; then
git clone https://github.com/jsha/minica
cd minica && git checkout v1.1.0 && go build && cd ..
@@ -325,7 +333,7 @@ jobs:
cp minica.pem tests/gocase/tls/cert/ca.crt
GOCASE_RUN_ARGS="-tlsEnable"
fi
- ./x.py test go build -parallel 2 $GOCASE_RUN_ARGS ${{
matrix.ignore_when_tsan}}
+ ./x.py test go build -parallel 2 $GOCASE_RUN_ARGS ${{
matrix.ignore_when_tsan}} ${{ matrix.ignore_when_asan}} ${{
matrix.ignore_when_ubsan}}
- name: Install redis-py
run: pip3 install redis==5.2.0
@@ -475,7 +483,7 @@ jobs:
- name: Setup openSUSE
if: ${{ startsWith(matrix.image, 'opensuse') }}
run: |
- zypper install -y gcc11 gcc11-c++ make wget git autoconf automake
python3 python3-pip curl tar gzip cmake go
+ zypper install -y gcc11 gcc11-c++ make wget git autoconf automake
python3 python3-pip curl tar gzip cmake go zlib-devel
update-alternatives --install /usr/bin/cc cc /usr/bin/gcc-11 100
update-alternatives --install /usr/bin/c++ c++ /usr/bin/g++-11 100
update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-11 100
@@ -487,7 +495,7 @@ jobs:
run: |
dnf install -y epel-release
dnf config-manager --set-enabled powertools
- dnf install -y git gcc-toolset-12 autoconf automake libtool python3
python3-pip openssl-devel which cmake
+ dnf install -y git gcc-toolset-12 autoconf automake libtool python3
python3-pip openssl-devel which cmake zlib-devel gcc-toolset-12-libstdc++-devel
source /opt/rh/gcc-toolset-12/enable
update-alternatives --install /usr/bin/gcc gcc
/opt/rh/gcc-toolset-12/root/usr/bin/gcc 100
update-alternatives --install /usr/bin/g++ g++
/opt/rh/gcc-toolset-12/root/usr/bin/g++ 100
@@ -500,7 +508,7 @@ jobs:
run: |
dnf install -y epel-release
dnf config-manager --set-enabled crb
- dnf install -y git gcc-toolset-12 autoconf automake libtool python3
python3-pip openssl-devel which cmake
+ dnf install -y git gcc-toolset-12 autoconf automake libtool python3
python3-pip openssl-devel which cmake zlib-devel
source /opt/rh/gcc-toolset-12/enable
update-alternatives --install /usr/bin/gcc gcc
/opt/rh/gcc-toolset-12/root/usr/bin/gcc 100
update-alternatives --install /usr/bin/g++ g++
/opt/rh/gcc-toolset-12/root/usr/bin/g++ 100
@@ -519,7 +527,7 @@ jobs:
if: ${{ startsWith(matrix.image, 'alpine') }}
run: |
apk update
- apk add bash cmake curl git python3 wget make gcc g++ autoconf
linux-headers py3-pip py3-redis
+ apk add bash cmake curl git python3 wget make gcc g++ autoconf
linux-headers py3-pip py3-redis zlib-dev
echo "NPROC=$(nproc)" >> $GITHUB_ENV
- name: Cache redis
@@ -566,6 +574,7 @@ jobs:
- name: Run Go Integration Cases
run: |
export PATH=$PATH:$HOME/local/bin/
+ export CGO_ENABLED=1
GOCASE_RUN_ARGS=""
./x.py test go build -parallel 2 $GOCASE_RUN_ARGS
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 5fce5ff62..b9d15f35b 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1370,6 +1370,55 @@ class CommandPollUpdates : public Commander {
Format format_ = Format::Raw;
};
+class CommandSST : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+ std::string cmd = GET_OR_RET(parser.TakeStr());
+ if (!util::EqualICase(cmd, "load")) {
+ return {Status::RedisParseErr, "unknown subcommand:" + args[1]};
+ }
+ folder_ = GET_OR_RET(parser.TakeStr());
+ // Parse optional movefiles flag
+ while (parser.Good()) {
+ if (parser.EatEqICase("movefiles")) {
+ std::string move_files = GET_OR_RET(parser.TakeStr());
+ if (util::EqualICase(move_files, "yes")) {
+ ingest_options_.move_files = true;
+ } else if (util::EqualICase(move_files, "no")) {
+ ingest_options_.move_files = false;
+ } else {
+ return {Status::RedisParseErr, "movefiles value must be 'yes' or
'no'"};
+ }
+ } else {
+ return {Status::RedisParseErr, "unknown option: " +
parser.TakeStr().GetValue()};
+ }
+ }
+ return Commander::Parse(args);
+ }
+
+ Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv,
[[maybe_unused]] Connection *conn,
+ std::string *output) override {
+ if (srv->GetConfig()->cluster_enabled) {
+ return {Status::NotOK, "The SST command is not supported in cluster
mode."};
+ }
+ if (srv->IsSlave()) {
+ return {Status::NotOK, "Replica nodes do not support the SST command"};
+ }
+ if (srv->GetReplicaCount() != 0) {
+ return {Status::NotOK, "The SST command is not supported when there are
replicas."};
+ }
+ auto s = srv->storage->IngestSST(folder_, ingest_options_);
+ if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};
+ *output = conn->Map({{redis::BulkString("files_loaded"),
redis::Integer(s.GetValue())}});
+ return Status::OK();
+ }
+
+ private:
+ std::string folder_;
+ rocksdb::IngestExternalFileOptions ingest_options_;
+};
+
REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only
ok-loading auth", NO_KEY),
MakeCmdAttr<CommandPing>("ping", -1, "read-only",
NO_KEY),
MakeCmdAttr<CommandSelect>("select", 2, "read-only",
NO_KEY),
@@ -1410,5 +1459,6 @@ REDIS_REGISTER_COMMANDS(Server,
MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading
bypass-multi no-script", NO_KEY),
MakeCmdAttr<CommandApplyBatch>("applybatch", -2,
"write no-multi", NO_KEY),
MakeCmdAttr<CommandDump>("dump", 2, "read-only", 1, 1,
1),
- MakeCmdAttr<CommandPollUpdates>("pollupdates", -2,
"read-only admin", NO_KEY), )
+ MakeCmdAttr<CommandPollUpdates>("pollupdates", -2,
"read-only admin", NO_KEY),
+ MakeCmdAttr<CommandSST>("sst", -3, "write exclusive
admin", 1, 1, 1), )
} // namespace redis
diff --git a/src/server/server.h b/src/server/server.h
index b8c314291..611b51678 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -228,6 +228,13 @@ class Server {
void WakeupBlockingConns(const std::string &key, size_t n_conns);
void OnEntryAddedToStream(const std::string &ns, const std::string &key,
const redis::StreamEntryID &entry_id);
+ size_t GetReplicaCount() {
+ slave_threads_mu_.lock();
+ auto replica_count = slave_threads_.size();
+ slave_threads_mu_.unlock();
+ return replica_count;
+ }
+
std::string GetLastRandomKeyCursor();
void SetLastRandomKeyCursor(const std::string &cursor);
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index e6bc881c4..b880f86d7 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -20,6 +20,7 @@
#include "storage.h"
+#include <dirent.h>
#include <event2/buffer.h>
#include <fcntl.h>
#include <glog/logging.h>
@@ -769,6 +770,87 @@ rocksdb::Status Storage::FlushScripts(engine::Context
&ctx, const rocksdb::Write
return Write(ctx, options, batch->GetWriteBatch());
}
+StatusOr<int> Storage::IngestSST(const std::string &sst_dir, const
rocksdb::IngestExternalFileOptions &ingest_options) {
+ std::vector<std::string> sst_files;
+ auto s = env_->GetChildren(sst_dir, &sst_files);
+ if (!s.ok()) {
+ return {Status::NotOK, "Failed to open directory " + sst_dir + ": " +
s.ToString()};
+ }
+
+ std::vector<std::string> filtered_files;
+ for (const auto &filename : sst_files) {
+ if (filename.length() >= 4 && filename.substr(filename.length() - 4) ==
".sst") {
+ filtered_files.push_back(sst_dir + "/" + filename);
+ }
+ }
+ sst_files = std::move(filtered_files);
+
+ if (sst_files.empty()) {
+ LOG(WARNING) << "No SST files found in " << sst_dir;
+ return 0;
+ }
+
+ std::unordered_map<std::string_view, std::vector<std::string>> cf_files;
+ std::vector<std::string> cf_names;
+ for (const auto &cf : ColumnFamilyConfigs::ListAllColumnFamilies()) {
+ cf_names.emplace_back(cf.Name());
+ }
+
+ for (const auto &file : sst_files) {
+ bool matched = false;
+ for (const auto &cf_name : cf_names) {
+ if (file.find(cf_name) != std::string::npos) {
+ cf_files[cf_name].push_back(file);
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ return {Status::NotOK, fmt::format("SST file '{}' does not match any
known column family name", file)};
+ }
+ }
+
+ // Process each set of files with the appropriate column family
+ // By importing the specific column family SST files first, we avoid data
corruption -
+ // if import fails, no data is made available or corrupted in either column
family
+ // if the metadata import fails, the imported data will be deleted by the
compaction.
+ rocksdb::Status status;
+ // Process files for each column family except metadata
+ for (const auto &[cf_name, files] : cf_files) {
+ if (cf_name == kMetadataColumnFamilyName) continue;
+ if (files.empty()) continue;
+
+ rocksdb::ColumnFamilyHandle *cf_handle = nullptr;
+ // Find the correct CF handle
+ for (auto handle : cf_handles_) {
+ if (handle->GetName() == cf_name) {
+ cf_handle = handle;
+ break;
+ }
+ }
+
+ status = ingestSST(cf_handle, ingest_options, files);
+ if (!status.ok()) {
+ return {Status::NotOK, status.ToString()};
+ }
+ }
+ // Process metadata files
+ const auto &metadata_files = cf_files[kMetadataColumnFamilyName];
+ if (!metadata_files.empty()) {
+ status = ingestSST(GetCFHandle(ColumnFamilyID::Metadata), ingest_options,
metadata_files);
+ if (!status.ok()) {
+ return {Status::NotOK, status.ToString()};
+ }
+ }
+ return sst_files.size();
+}
+
+rocksdb::Status Storage::ingestSST(rocksdb::ColumnFamilyHandle *cf_handle,
+ const rocksdb::IngestExternalFileOptions
&options,
+ const std::vector<std::string>
&sst_file_names) {
+ return db_->IngestExternalFile(cf_handle, sst_file_names, options);
+}
+
Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
return applyWriteBatch(default_write_opts_, batch);
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index a4563eabc..71d8b9537 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -270,6 +270,9 @@ class Storage {
[[nodiscard]] rocksdb::Status Compact(rocksdb::ColumnFamilyHandle *cf, const
rocksdb::Slice *begin,
const rocksdb::Slice *end);
+ [[nodiscard]] StatusOr<int> IngestSST(const std::string &folder,
+ const
rocksdb::IngestExternalFileOptions &ingest_options);
+
rocksdb::DB *GetDB();
bool IsClosing() const { return db_closing_; }
std::string GetName() const { return config_->db_name; }
@@ -389,6 +392,8 @@ class Storage {
rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions
&options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Status &s);
Status applyWriteBatch(const rocksdb::WriteOptions &options,
rocksdb::WriteBatch *batch);
+ rocksdb::Status ingestSST(rocksdb::ColumnFamilyHandle *cf_handle, const
rocksdb::IngestExternalFileOptions &options,
+ const std::vector<std::string> &sst_file_names);
};
/// Context passes fixed snapshot and batch between APIs
diff --git a/tests/gocase/go.mod b/tests/gocase/go.mod
index f695ea41c..4160e3919 100644
--- a/tests/gocase/go.mod
+++ b/tests/gocase/go.mod
@@ -15,6 +15,7 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f //
indirect
github.com/ebitengine/purego v0.8.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
+ github.com/linxGnu/grocksdb v1.9.9
github.com/lufia/plan9stats v0.0.0-20250303091104-876f3ea5145d //
indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 //
indirect
diff --git a/tests/gocase/go.sum b/tests/gocase/go.sum
index 62d2d97e2..1dc47e066 100644
--- a/tests/gocase/go.sum
+++ b/tests/gocase/go.sum
@@ -15,6 +15,8 @@ github.com/go-ole/go-ole v1.3.0
h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod
h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/linxGnu/grocksdb v1.9.9
h1:CzSS/vHLtVIdxdrjvqWR/sm93u/0eB6UcoO14YqObnw=
+github.com/linxGnu/grocksdb v1.9.9/go.mod
h1:C3CNe9UYc9hlEM2pC82AqiGS3LRW537u9LFV4wIZuHk=
github.com/lufia/plan9stats v0.0.0-20250303091104-876f3ea5145d
h1:fjMbDVUGsMQiVZnSQsmouYJvMdwsGiDipOZoN66v844=
github.com/lufia/plan9stats v0.0.0-20250303091104-876f3ea5145d/go.mod
h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
diff --git a/tests/gocase/unit/sst/sst_load_test.go
b/tests/gocase/unit/sst/sst_load_test.go
new file mode 100644
index 000000000..cd0e264a6
--- /dev/null
+++ b/tests/gocase/unit/sst/sst_load_test.go
@@ -0,0 +1,609 @@
+//go:build !(ignore_when_tsan || ignore_when_asan || ignore_when_ubsan)
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package sst
+
+import (
+ "context"
+ "encoding/binary"
+ "fmt"
+ "log"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/linxGnu/grocksdb"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+)
+
+const (
+ DefaultKvrocksNamespace = "__namespace"
+ metaDataEncodingMask uint8 = 0x82
+ versionCounterBits = 11
+)
+
+type Metadata struct {
+ Flags uint8
+ Expire uint64
+ Version uint64
+ Size uint64
+}
+
+type SSTResponse struct {
+ filesLoaded int64 `redis:"files_loaded"`
+}
+
+func NewMetadata() *Metadata {
+ src := rand.NewSource(time.Now().UnixNano())
+ r := rand.New(src)
+
+ timestamp := uint64(time.Now().UnixMicro())
+ counter := uint64(r.Int63())
+ version := (timestamp << versionCounterBits) + (counter % (1 <<
versionCounterBits))
+
+ return &Metadata{
+ Flags: metaDataEncodingMask,
+ Version: version,
+ }
+}
+
+func (m *Metadata) Encode() []byte {
+ buf := make([]byte, 25) // 1 + 8 + 8 + 8 bytes
+ buf[0] = m.Flags
+ binary.BigEndian.PutUint64(buf[1:], m.Expire)
+ binary.BigEndian.PutUint64(buf[9:], m.Version)
+ binary.BigEndian.PutUint64(buf[17:], m.Size)
+ return buf
+}
+
+func encodeInternalKey(namespace, key, field string, version uint64) []byte {
+ nsLen := len(namespace)
+ keyLen := len(key)
+ fieldLen := len(field)
+
+ // Pre-calculate total size: 1 byte for ns size + ns + 4 bytes for key
size + key + 8 bytes for version + field
+ out := make([]byte, 1+nsLen+4+keyLen+8+fieldLen)
+
+ out[0] = uint8(nsLen)
+ copy(out[1:], namespace)
+ binary.BigEndian.PutUint32(out[1+nsLen:], uint32(keyLen))
+ copy(out[5+nsLen:], key)
+ binary.BigEndian.PutUint64(out[5+nsLen+keyLen:], version)
+ copy(out[13+nsLen+keyLen:], field)
+
+ return out
+}
+
+func encodeRedisHashKey(namespace, userKey string) []byte {
+ totalLen := 1 + len(namespace) + len(userKey)
+ buf := make([]byte, totalLen)
+ buf[0] = uint8(len(namespace))
+ copy(buf[1:], namespace)
+ copy(buf[1+len(namespace):], userKey)
+ return buf
+}
+
+func createSSTFile(filename string, data map[string]string) error {
+ envOpts := grocksdb.NewDefaultEnvOptions()
+ sstWriterOpts := grocksdb.NewDefaultOptions()
+ sstWriterOpts.SetCompression(grocksdb.CompressionType(2))
+ sstWriter := grocksdb.NewSSTFileWriter(envOpts, sstWriterOpts)
+ defer sstWriter.Destroy()
+
+ err := sstWriter.Open(filename)
+ if err != nil {
+ return fmt.Errorf("failed to open SST file writer: %v", err)
+ }
+
+ // Get all keys and sort them
+ keys := make([]string, 0, len(data))
+ for k := range data {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+
+ // Add keys in sorted order
+ for _, k := range keys {
+ err = sstWriter.Add([]byte(k), []byte(data[k]))
+ if err != nil {
+ log.Printf("Error adding key %s to SST: %v", k, err)
+ }
+ }
+
+ err = sstWriter.Finish()
+ if err != nil {
+ return fmt.Errorf("failed to finish SST file: %v", err)
+ }
+ return nil
+}
+
+func makeTempDir() (string, error) {
+ return os.MkdirTemp("", "sst_test_*")
+}
+
+func toInt64(val interface{}) (int64, error) {
+ switch v := val.(type) {
+ case int64:
+ return v, nil
+ case int:
+ return int64(v), nil
+ case float64:
+ return int64(v), nil
+ default:
+ return 0, fmt.Errorf("value is not a number, got %T", val)
+ }
+}
+
+func ExtractSSTResponse(result interface{}) (*SSTResponse, error) {
+ resultMap, ok := result.(map[interface{}]interface{})
+ if !ok {
+ return nil, fmt.Errorf("expected map[interface{}]interface{},
got %T", result)
+ }
+ response := &SSTResponse{}
+ for field, target := range map[string]*int64{
+ "files_loaded": &response.filesLoaded,
+ } {
+ if val, ok := resultMap[field]; ok {
+ converted, err := toInt64(val)
+ if err != nil {
+ return nil, fmt.Errorf("%s: %v", field, err)
+ }
+ *target = converted
+ }
+ }
+ return response, nil
+}
+
+func TestSSTLoad(t *testing.T) {
+ configOptions := []util.ConfigOptions{
+ {
+ Name: "resp3-enabled",
+ Options: []string{"yes"},
+ ConfigType: util.YesNo,
+ },
+ }
+ configsMatrix, err := util.GenerateConfigsMatrix(configOptions)
+ require.NoError(t, err)
+ for _, configs := range configsMatrix {
+ testSSTLoad(t, configs)
+ }
+}
+
+var testSSTLoad = func(t *testing.T, configs util.KvrocksServerConfigs) {
+ srv := util.StartServer(t, configs)
+ defer srv.Close()
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("Test load cmd with no folder", func(t *testing.T) {
+ r := rdb.Do(ctx, "sst", "load")
+ assert.Error(t, r.Err())
+ })
+
+ t.Run("Test wrong subcommand", func(t *testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ r := rdb.Do(ctx, "sst", "wrong-sub-command", dir)
+ assert.Error(t, r.Err())
+ })
+
+ t.Run("Test wrong load option", func(t *testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ r := rdb.Do(ctx, "sst", "load", dir, "wrong-load-option")
+ assert.Error(t, r.Err())
+ })
+
+ t.Run("Test empty folder", func(t *testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+ r := rdb.Do(ctx, "sst", "load", dir)
+ assert.NoError(t, r.Err())
+ resp, err := ExtractSSTResponse(r.Val())
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), resp.filesLoaded)
+ })
+
+ t.Run("Test load redis hash keys", func(t *testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ namespace := DefaultKvrocksNamespace
+ data := map[string][]map[string]string{
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ }
+ keys := make(map[string]string, len(data))
+ metaKeys := make(map[string]string, len(data))
+ for hashK := range data {
+ meta := NewMetadata()
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK, fieldV := range field {
+ internalKey :=
encodeInternalKey(namespace, hashK, fieldK, meta.Version)
+ keys[string(internalKey)] = fieldV
+ }
+ }
+ hashKey := encodeRedisHashKey(namespace, hashK)
+ meta.Size++
+ metaKeys[string(hashKey)] = string(meta.Encode())
+ }
+
+ err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"),
keys)
+ assert.NoError(t, err)
+ err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"),
metaKeys)
+ assert.NoError(t, err)
+
+ r := rdb.Do(ctx, "sst", "load", dir)
+ assert.NoError(t, r.Err())
+ resp, err := ExtractSSTResponse(r.Val())
+ assert.NoError(t, err)
+ assert.Equal(t, int64(2), resp.filesLoaded)
+
+ //verify files didn't get moved
+ _, err = os.Stat(filepath.Join(dir, "kvrocks_default.sst"))
+ assert.NoError(t, err)
+ _, err = os.Stat(filepath.Join(dir, "kvrocks_metadata.sst"))
+ assert.NoError(t, err)
+
+ for hashK, fields := range data {
+ for _, field := range fields {
+ for fieldK, expectedVal := range field {
+ val := rdb.HGet(ctx, hashK, fieldK)
+ assert.NoError(t, val.Err())
+ assert.Equal(t, expectedVal, val.Val(),
"Hash field value mismatch for key:%s field:%s", hashK, fieldK)
+ }
+ }
+ }
+ })
+
+ t.Run("Test load and update redis hash keys", func(t *testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ namespace := DefaultKvrocksNamespace
+ data := map[string][]map[string]string{
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ }
+ keys := make(map[string]string, len(data))
+ metaKeys := make(map[string]string, len(data))
+ for hashK := range data {
+ meta := NewMetadata()
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK, fieldV := range field {
+ internalKey :=
encodeInternalKey(namespace, hashK, fieldK, meta.Version)
+ keys[string(internalKey)] = fieldV
+ }
+ }
+ hashKey := encodeRedisHashKey(namespace, hashK)
+ meta.Size++
+ metaKeys[string(hashKey)] = string(meta.Encode())
+ }
+
+ err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"),
keys)
+ assert.NoError(t, err)
+ err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"),
metaKeys)
+ assert.NoError(t, err)
+
+ r := rdb.Do(ctx, "sst", "load", dir)
+ assert.NoError(t, r.Err())
+ resp, err := ExtractSSTResponse(r.Val())
+ assert.NoError(t, err)
+ assert.Equal(t, int64(2), resp.filesLoaded)
+
+ for hashK, fields := range data {
+ for _, field := range fields {
+ for fieldK, expectedVal := range field {
+ val := rdb.HGet(ctx, hashK, fieldK)
+ assert.NoError(t, val.Err())
+ assert.Equal(t, expectedVal, val.Val(),
"Hash field value mismatch for key:%s field:%s", hashK, fieldK)
+ }
+ }
+ }
+ // update the keys fields via redis interface
+ for hashK := range data {
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK := range field {
+ newVal := "__avoid_collisions__" +
util.RandString(1, 10, util.Alpha)
+ r := rdb.HSet(ctx, hashK, fieldK,
newVal)
+ assert.NoError(t, r.Err())
+ field[fieldK] = newVal
+ }
+ }
+ }
+ // validate the updates made via redis interface
+ for hashK, fields := range data {
+ for _, field := range fields {
+ for fieldK, expectedVal := range field {
+ val := rdb.HGet(ctx, hashK, fieldK)
+ assert.NoError(t, val.Err())
+ assert.Equal(t, expectedVal, val.Val(),
"Hash field value mismatch for key:%s field:%s", hashK, fieldK)
+ }
+ }
+ }
+ // update the values and sst load
+ for hashK := range data {
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK := range field {
+ newVal := "__avoid_collisions__" +
util.RandString(1, 10, util.Alpha)
+ field[fieldK] = newVal
+ }
+ }
+ }
+
+ keys = make(map[string]string, len(data))
+ metaKeys = make(map[string]string, len(data))
+ for hashK := range data {
+ meta := NewMetadata()
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK, fieldV := range field {
+ internalKey :=
encodeInternalKey(namespace, hashK, fieldK, meta.Version)
+ keys[string(internalKey)] = fieldV
+ }
+ }
+ hashKey := encodeRedisHashKey(namespace, hashK)
+ meta.Size++
+ metaKeys[string(hashKey)] = string(meta.Encode())
+ }
+
+ err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"),
keys)
+ assert.NoError(t, err)
+ err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"),
metaKeys)
+ assert.NoError(t, err)
+
+ r = rdb.Do(ctx, "sst", "load", dir)
+ assert.NoError(t, r.Err())
+ resp, err = ExtractSSTResponse(r.Val())
+ assert.NoError(t, err)
+ assert.Equal(t, int64(2), resp.filesLoaded)
+
+ for hashK, fields := range data {
+ for _, field := range fields {
+ for fieldK, expectedVal := range field {
+ val := rdb.HGet(ctx, hashK, fieldK)
+ assert.NoError(t, val.Err())
+ assert.Equal(t, expectedVal, val.Val(),
"Hash field value mismatch for key:%s field:%s", hashK, fieldK)
+ }
+ }
+ }
+ })
+
+ t.Run("Test load redis hash keys with move option", func(t *testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ namespace := DefaultKvrocksNamespace
+ data := map[string][]map[string]string{
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ }
+ keys := make(map[string]string, len(data))
+ metaKeys := make(map[string]string, len(data))
+ for hashK := range data {
+ meta := NewMetadata()
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK, fieldV := range field {
+ internalKey :=
encodeInternalKey(namespace, hashK, fieldK, meta.Version)
+ keys[string(internalKey)] = fieldV
+ }
+ }
+ hashKey := encodeRedisHashKey(namespace, hashK)
+ meta.Size++
+ metaKeys[string(hashKey)] = string(meta.Encode())
+ }
+
+ err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"),
keys)
+ assert.NoError(t, err)
+ err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"),
metaKeys)
+ assert.NoError(t, err)
+
+ r := rdb.Do(ctx, "sst", "load", dir, "movefiles", "yes")
+ assert.NoError(t, r.Err())
+ resp, err := ExtractSSTResponse(r.Val())
+ assert.NoError(t, err)
+ assert.Equal(t, int64(2), resp.filesLoaded)
+
+ //verify files did get moved
+ _, err = os.Stat(filepath.Join(dir, "kvrocks_default.sst"))
+ assert.True(t, os.IsNotExist(err))
+ _, err = os.Stat(filepath.Join(dir, "kvrocks_metadata.sst"))
+ assert.True(t, os.IsNotExist(err))
+
+ for hashK, fields := range data {
+ for _, field := range fields {
+ for fieldK, expectedVal := range field {
+ val := rdb.HGet(ctx, hashK, fieldK)
+ assert.NoError(t, val.Err())
+ assert.Equal(t, expectedVal, val.Val(),
"Hash field value mismatch for key:%s field:%s", hashK, fieldK)
+ }
+ }
+ }
+ })
+
+ t.Run("Test load redis hash keys with no metadata entries", func(t
*testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ namespace := DefaultKvrocksNamespace
+ data := map[string][]map[string]string{
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ }
+ keys := make(map[string]string, len(data))
+ metaKeys := make(map[string]string, len(data))
+ for hashK := range data {
+ meta := NewMetadata()
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK, fieldV := range field {
+ internalKey :=
encodeInternalKey(namespace, hashK, fieldK, meta.Version)
+ keys[string(internalKey)] = fieldV
+ }
+ }
+ hashKey := encodeRedisHashKey(namespace, hashK)
+ meta.Size++
+ metaKeys[string(hashKey)] = string(meta.Encode())
+ }
+
+ err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"),
keys)
+ assert.NoError(t, err)
+
+ r := rdb.Do(ctx, "sst", "load", dir)
+ assert.NoError(t, r.Err())
+ resp, err := ExtractSSTResponse(r.Val())
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), resp.filesLoaded)
+
+ for hashK, fields := range data {
+ for _, field := range fields {
+ for fieldK := range field {
+ val := rdb.HGet(ctx, hashK, fieldK)
+ assert.Error(t, val.Err())
+ }
+ }
+ }
+ })
+
+ t.Run("Test load redis hash keys with expiration", func(t *testing.T) {
+ dir, err := makeTempDir()
+ require.NoError(t, err)
+ defer os.RemoveAll(dir)
+
+ namespace := DefaultKvrocksNamespace
+ data := map[string][]map[string]string{
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ "__avoid_collisions__" + util.RandString(1, 10,
util.Alpha): {
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ {"__avoid_collisions__" + util.RandString(1,
10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)},
+ },
+ }
+ keys := make(map[string]string, len(data))
+ metaKeys := make(map[string]string, len(data))
+ for hashK := range data {
+ meta := NewMetadata()
+ meta.Expire = uint64(time.Now().Add(5 *
time.Second).UnixMilli())
+ fields := data[hashK]
+ for _, field := range fields {
+ for fieldK, fieldV := range field {
+ internalKey :=
encodeInternalKey(namespace, hashK, fieldK, meta.Version)
+ keys[string(internalKey)] = fieldV
+ }
+ }
+ hashKey := encodeRedisHashKey(namespace, hashK)
+ meta.Size++
+ metaKeys[string(hashKey)] = string(meta.Encode())
+ }
+
+ err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"),
keys)
+ assert.NoError(t, err)
+ err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"),
metaKeys)
+ assert.NoError(t, err)
+
+ r := rdb.Do(ctx, "sst", "load", dir)
+ assert.NoError(t, r.Err())
+ resp, err := ExtractSSTResponse(r.Val())
+ assert.NoError(t, err)
+ assert.Equal(t, int64(2), resp.filesLoaded)
+
+ // verify keys have expiration
+ for hashK := range data {
+ expireDuration := rdb.ExpireTime(ctx, hashK)
+ assert.NotEmpty(t, expireDuration.Val().Milliseconds())
+ }
+ //verify keys have expired
+ time.Sleep(5 * time.Second)
+ for hashK, fields := range data {
+ for _, field := range fields {
+ for fieldK := range field {
+ val := rdb.HGet(ctx, hashK, fieldK)
+ assert.Error(t, val.Err())
+ }
+ }
+ }
+ })
+}
diff --git a/x.py b/x.py
index deb3b4523..5ab66c07c 100755
--- a/x.py
+++ b/x.py
@@ -223,6 +223,60 @@ def clang_tidy(dir: str, jobs: Optional[int],
clang_tidy_path: str, run_clang_ti
run(run_command, *options, *regexes, verbose=True, cwd=basedir)
+def is_rocky_linux():
+ try:
+ with open('/etc/os-release') as f:
+ lines = f.readlines()
+ data = {}
+ for line in lines:
+ if '=' in line:
+ key, value = line.strip().split('=', 1)
+ data[key] = value.strip('"')
+
+ is_rocky = data.get('ID') == 'rocky'
+ version_id = data.get('VERSION_ID', '').split('.')[0]
+ version_match = version_id in ('8', '9')
+
+ return is_rocky and version_match
+ except FileNotFoundError:
+ print("File /etc/os-release not found.")
+ return False
+ except Exception as e:
+ print(f"Unexpected error: {e}")
+ return False
+
+
+def get_custom_env():
+ rocksdb =
Path(__file__).parent.absolute().joinpath('build/_deps/rocksdb-src/include')
+ rocksdb_lib =
Path(__file__).parent.absolute().joinpath('build/_deps/rocksdb-build')
+ zlib =
Path(__file__).parent.absolute().joinpath('build/_deps/zstd-src/lib')
+ z4lib =
Path(__file__).parent.absolute().joinpath('build/_deps/lz4-src/lib')
+ snappy_lib =
Path(__file__).parent.absolute().joinpath('build/_deps/snappy-build')
+
+ additional_flags = ""
+ libstdc_folder = ""
+ env = os.environ.copy()
+
+ if is_rocky_linux():
+ output = run_pipe("find", "/opt/rh/gcc-toolset-12/root/", "-name",
"libstdc++.so*")
+ output = run_pipe("grep", "-v", "32",stdin=output)
+ output = run_pipe("xargs", "dirname", stdin=output)
+ libstdc_folder = output.read().strip() + "/"
+
+ if libstdc_folder != "":
+ additional_flags = f"-L{libstdc_folder} -lstdc++"
+
+ with_cov = env.get("WITH_COVERAGE","false")
+ if with_cov == "true":
+ additional_flags += "-lgcov"
+
+ env.update({
+ "CGO_CFLAGS": f"-I{rocksdb}",
+ "CGO_LDFLAGS": f"-L{rocksdb_lib} -L{zlib} -L{z4lib} -L{snappy_lib}
{additional_flags}",
+ })
+ return env
+
+
def golangci_lint(golangci_lint_path: str) -> None:
def get_gopath() -> Tuple[Path, Path]:
go = find_command('go', msg='go is required for testing')
@@ -258,7 +312,7 @@ def golangci_lint(golangci_lint_path: str) -> None:
check_version(version_str, GOLANGCI_LINT_REQUIRED_VERSION,
"golangci-lint")
basedir = Path(__file__).parent.absolute() / 'tests' / 'gocase'
- run(binpath_str, 'run', '-v', './...', cwd=str(basedir), verbose=True)
+ run(binpath_str, 'run', '-v', './...', cwd=str(basedir), verbose=True,
env=get_custom_env())
def write_version(release_version: str) -> str:
@@ -322,7 +376,7 @@ def test_go(dir: str, cli_path: str, rest: List[str]) ->
None:
*rest
]
- run(go, *args, cwd=str(basedir), verbose=True)
+ run(go, *args, cwd=str(basedir), verbose=True, env=get_custom_env())
if __name__ == '__main__':