This is an automated email from the ASF dual-hosted git repository. hulk pushed a commit to branch unstable in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push: new 76a21bcb4 feat(storage): support for sideloading SSTs (#2845) 76a21bcb4 is described below commit 76a21bcb4b8e7a625fa69a7e67fe915830008190 Author: Luigi Tagliamonte <51684360+ltagliamonte...@users.noreply.github.com> AuthorDate: Thu Apr 17 04:45:12 2025 -0700 feat(storage): support for sideloading SSTs (#2845) Co-authored-by: Aleks Lozovyuk <aleks.rai...@gmail.com> Co-authored-by: hulk <hulk.webs...@gmail.com> Co-authored-by: Twice <twice.m...@gmail.com> --- .github/workflows/kvrocks.yaml | 19 +- src/commands/cmd_server.cc | 52 ++- src/server/server.h | 7 + src/storage/storage.cc | 82 +++++ src/storage/storage.h | 5 + tests/gocase/go.mod | 1 + tests/gocase/go.sum | 2 + tests/gocase/unit/sst/sst_load_test.go | 609 +++++++++++++++++++++++++++++++++ x.py | 58 +++- 9 files changed, 827 insertions(+), 8 deletions(-) diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml index 33840a2c1..de4e26752 100644 --- a/.github/workflows/kvrocks.yaml +++ b/.github/workflows/kvrocks.yaml @@ -147,6 +147,7 @@ jobs: os: ubuntu-22.04 compiler: gcc sonarcloud: -DCMAKE_CXX_FLAGS=--coverage + with_coverage: "true" - name: Ubuntu Clang os: ubuntu-22.04 compiler: clang @@ -162,11 +163,13 @@ jobs: without_jemalloc: -DDISABLE_JEMALLOC=ON with_sanitizer: -DENABLE_ASAN=ON compiler: gcc + ignore_when_asan: -tags="ignore_when_asan" - name: Ubuntu Clang ASan os: ubuntu-22.04 with_sanitizer: -DENABLE_ASAN=ON without_jemalloc: -DDISABLE_JEMALLOC=ON compiler: clang + ignore_when_asan: -tags="ignore_when_asan" - name: Ubuntu GCC TSan os: ubuntu-22.04 without_jemalloc: -DDISABLE_JEMALLOC=ON @@ -184,6 +187,7 @@ jobs: with_sanitizer: -DENABLE_UBSAN=ON without_jemalloc: -DDISABLE_JEMALLOC=ON compiler: clang + ignore_when_ubsan: -tags="ignore_when_ubsan" - name: Ubuntu GCC Ninja os: ubuntu-22.04 with_ninja: --ninja @@ -315,7 +319,11 @@ jobs: export LSAN_OPTIONS="suppressions=$(realpath ./tests/lsan-suppressions)" export TSAN_OPTIONS="suppressions=$(realpath ./tests/tsan-suppressions)" export PATH=$PATH:$HOME/local/bin/ + export CGO_ENABLED=1 GOCASE_RUN_ARGS="" + if [[ -n "${{ matrix.with_coverage }}" ]]; then + export WITH_COVERAGE="true" + fi if [[ -n "${{ matrix.with_openssl }}" ]] && [[ "${{ matrix.os }}" == ubuntu* ]]; then git clone https://github.com/jsha/minica cd minica && git checkout v1.1.0 && go build && cd .. @@ -325,7 +333,7 @@ jobs: cp minica.pem tests/gocase/tls/cert/ca.crt GOCASE_RUN_ARGS="-tlsEnable" fi - ./x.py test go build -parallel 2 $GOCASE_RUN_ARGS ${{ matrix.ignore_when_tsan}} + ./x.py test go build -parallel 2 $GOCASE_RUN_ARGS ${{ matrix.ignore_when_tsan}} ${{ matrix.ignore_when_asan}} ${{ matrix.ignore_when_ubsan}} - name: Install redis-py run: pip3 install redis==5.2.0 @@ -475,7 +483,7 @@ jobs: - name: Setup openSUSE if: ${{ startsWith(matrix.image, 'opensuse') }} run: | - zypper install -y gcc11 gcc11-c++ make wget git autoconf automake python3 python3-pip curl tar gzip cmake go + zypper install -y gcc11 gcc11-c++ make wget git autoconf automake python3 python3-pip curl tar gzip cmake go zlib-devel update-alternatives --install /usr/bin/cc cc /usr/bin/gcc-11 100 update-alternatives --install /usr/bin/c++ c++ /usr/bin/g++-11 100 update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-11 100 @@ -487,7 +495,7 @@ jobs: run: | dnf install -y epel-release dnf config-manager --set-enabled powertools - dnf install -y git gcc-toolset-12 autoconf automake libtool python3 python3-pip openssl-devel which cmake + dnf install -y git gcc-toolset-12 autoconf automake libtool python3 python3-pip openssl-devel which cmake zlib-devel gcc-toolset-12-libstdc++-devel source /opt/rh/gcc-toolset-12/enable update-alternatives --install /usr/bin/gcc gcc /opt/rh/gcc-toolset-12/root/usr/bin/gcc 100 update-alternatives --install /usr/bin/g++ g++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100 @@ -500,7 +508,7 @@ jobs: run: | dnf install -y epel-release dnf config-manager --set-enabled crb - dnf install -y git gcc-toolset-12 autoconf automake libtool python3 python3-pip openssl-devel which cmake + dnf install -y git gcc-toolset-12 autoconf automake libtool python3 python3-pip openssl-devel which cmake zlib-devel source /opt/rh/gcc-toolset-12/enable update-alternatives --install /usr/bin/gcc gcc /opt/rh/gcc-toolset-12/root/usr/bin/gcc 100 update-alternatives --install /usr/bin/g++ g++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100 @@ -519,7 +527,7 @@ jobs: if: ${{ startsWith(matrix.image, 'alpine') }} run: | apk update - apk add bash cmake curl git python3 wget make gcc g++ autoconf linux-headers py3-pip py3-redis + apk add bash cmake curl git python3 wget make gcc g++ autoconf linux-headers py3-pip py3-redis zlib-dev echo "NPROC=$(nproc)" >> $GITHUB_ENV - name: Cache redis @@ -566,6 +574,7 @@ jobs: - name: Run Go Integration Cases run: | export PATH=$PATH:$HOME/local/bin/ + export CGO_ENABLED=1 GOCASE_RUN_ARGS="" ./x.py test go build -parallel 2 $GOCASE_RUN_ARGS diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index 5fce5ff62..b9d15f35b 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1370,6 +1370,55 @@ class CommandPollUpdates : public Commander { Format format_ = Format::Raw; }; +class CommandSST : public Commander { + public: + Status Parse(const std::vector<std::string> &args) override { + CommandParser parser(args, 1); + std::string cmd = GET_OR_RET(parser.TakeStr()); + if (!util::EqualICase(cmd, "load")) { + return {Status::RedisParseErr, "unknown subcommand:" + args[1]}; + } + folder_ = GET_OR_RET(parser.TakeStr()); + // Parse optional movefiles flag + while (parser.Good()) { + if (parser.EatEqICase("movefiles")) { + std::string move_files = GET_OR_RET(parser.TakeStr()); + if (util::EqualICase(move_files, "yes")) { + ingest_options_.move_files = true; + } else if (util::EqualICase(move_files, "no")) { + ingest_options_.move_files = false; + } else { + return {Status::RedisParseErr, "movefiles value must be 'yes' or 'no'"}; + } + } else { + return {Status::RedisParseErr, "unknown option: " + parser.TakeStr().GetValue()}; + } + } + return Commander::Parse(args); + } + + Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, [[maybe_unused]] Connection *conn, + std::string *output) override { + if (srv->GetConfig()->cluster_enabled) { + return {Status::NotOK, "The SST command is not supported in cluster mode."}; + } + if (srv->IsSlave()) { + return {Status::NotOK, "Replica nodes do not support the SST command"}; + } + if (srv->GetReplicaCount() != 0) { + return {Status::NotOK, "The SST command is not supported when there are replicas."}; + } + auto s = srv->storage->IngestSST(folder_, ingest_options_); + if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()}; + *output = conn->Map({{redis::BulkString("files_loaded"), redis::Integer(s.GetValue())}}); + return Status::OK(); + } + + private: + std::string folder_; + rocksdb::IngestExternalFileOptions ingest_options_; +}; + REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading auth", NO_KEY), MakeCmdAttr<CommandPing>("ping", -1, "read-only", NO_KEY), MakeCmdAttr<CommandSelect>("select", 2, "read-only", NO_KEY), @@ -1410,5 +1459,6 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only o MakeCmdAttr<CommandReset>("reset", 1, "ok-loading bypass-multi no-script", NO_KEY), MakeCmdAttr<CommandApplyBatch>("applybatch", -2, "write no-multi", NO_KEY), MakeCmdAttr<CommandDump>("dump", 2, "read-only", 1, 1, 1), - MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin", NO_KEY), ) + MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin", NO_KEY), + MakeCmdAttr<CommandSST>("sst", -3, "write exclusive admin", 1, 1, 1), ) } // namespace redis diff --git a/src/server/server.h b/src/server/server.h index b8c314291..611b51678 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -228,6 +228,13 @@ class Server { void WakeupBlockingConns(const std::string &key, size_t n_conns); void OnEntryAddedToStream(const std::string &ns, const std::string &key, const redis::StreamEntryID &entry_id); + size_t GetReplicaCount() { + slave_threads_mu_.lock(); + auto replica_count = slave_threads_.size(); + slave_threads_mu_.unlock(); + return replica_count; + } + std::string GetLastRandomKeyCursor(); void SetLastRandomKeyCursor(const std::string &cursor); diff --git a/src/storage/storage.cc b/src/storage/storage.cc index e6bc881c4..b880f86d7 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -20,6 +20,7 @@ #include "storage.h" +#include <dirent.h> #include <event2/buffer.h> #include <fcntl.h> #include <glog/logging.h> @@ -769,6 +770,87 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write return Write(ctx, options, batch->GetWriteBatch()); } +StatusOr<int> Storage::IngestSST(const std::string &sst_dir, const rocksdb::IngestExternalFileOptions &ingest_options) { + std::vector<std::string> sst_files; + auto s = env_->GetChildren(sst_dir, &sst_files); + if (!s.ok()) { + return {Status::NotOK, "Failed to open directory " + sst_dir + ": " + s.ToString()}; + } + + std::vector<std::string> filtered_files; + for (const auto &filename : sst_files) { + if (filename.length() >= 4 && filename.substr(filename.length() - 4) == ".sst") { + filtered_files.push_back(sst_dir + "/" + filename); + } + } + sst_files = std::move(filtered_files); + + if (sst_files.empty()) { + LOG(WARNING) << "No SST files found in " << sst_dir; + return 0; + } + + std::unordered_map<std::string_view, std::vector<std::string>> cf_files; + std::vector<std::string> cf_names; + for (const auto &cf : ColumnFamilyConfigs::ListAllColumnFamilies()) { + cf_names.emplace_back(cf.Name()); + } + + for (const auto &file : sst_files) { + bool matched = false; + for (const auto &cf_name : cf_names) { + if (file.find(cf_name) != std::string::npos) { + cf_files[cf_name].push_back(file); + matched = true; + break; + } + } + if (!matched) { + return {Status::NotOK, fmt::format("SST file '{}' does not match any known column family name", file)}; + } + } + + // Process each set of files with the appropriate column family + // By importing the specific column family SST files first, we avoid data corruption - + // if import fails, no data is made available or corrupted in either column family + // if the metadata import fails, the imported data will be deleted by the compaction. + rocksdb::Status status; + // Process files for each column family except metadata + for (const auto &[cf_name, files] : cf_files) { + if (cf_name == kMetadataColumnFamilyName) continue; + if (files.empty()) continue; + + rocksdb::ColumnFamilyHandle *cf_handle = nullptr; + // Find the correct CF handle + for (auto handle : cf_handles_) { + if (handle->GetName() == cf_name) { + cf_handle = handle; + break; + } + } + + status = ingestSST(cf_handle, ingest_options, files); + if (!status.ok()) { + return {Status::NotOK, status.ToString()}; + } + } + // Process metadata files + const auto &metadata_files = cf_files[kMetadataColumnFamilyName]; + if (!metadata_files.empty()) { + status = ingestSST(GetCFHandle(ColumnFamilyID::Metadata), ingest_options, metadata_files); + if (!status.ok()) { + return {Status::NotOK, status.ToString()}; + } + } + return sst_files.size(); +} + +rocksdb::Status Storage::ingestSST(rocksdb::ColumnFamilyHandle *cf_handle, + const rocksdb::IngestExternalFileOptions &options, + const std::vector<std::string> &sst_file_names) { + return db_->IngestExternalFile(cf_handle, sst_file_names, options); +} + Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) { return applyWriteBatch(default_write_opts_, batch); } diff --git a/src/storage/storage.h b/src/storage/storage.h index a4563eabc..71d8b9537 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -270,6 +270,9 @@ class Storage { [[nodiscard]] rocksdb::Status Compact(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice *begin, const rocksdb::Slice *end); + [[nodiscard]] StatusOr<int> IngestSST(const std::string &folder, + const rocksdb::IngestExternalFileOptions &ingest_options); + rocksdb::DB *GetDB(); bool IsClosing() const { return db_closing_; } std::string GetName() const { return config_->db_name; } @@ -389,6 +392,8 @@ class Storage { rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s); Status applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch); + rocksdb::Status ingestSST(rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::IngestExternalFileOptions &options, + const std::vector<std::string> &sst_file_names); }; /// Context passes fixed snapshot and batch between APIs diff --git a/tests/gocase/go.mod b/tests/gocase/go.mod index f695ea41c..4160e3919 100644 --- a/tests/gocase/go.mod +++ b/tests/gocase/go.mod @@ -15,6 +15,7 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/ebitengine/purego v0.8.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect + github.com/linxGnu/grocksdb v1.9.9 github.com/lufia/plan9stats v0.0.0-20250303091104-876f3ea5145d // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect diff --git a/tests/gocase/go.sum b/tests/gocase/go.sum index 62d2d97e2..1dc47e066 100644 --- a/tests/gocase/go.sum +++ b/tests/gocase/go.sum @@ -15,6 +15,8 @@ github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/linxGnu/grocksdb v1.9.9 h1:CzSS/vHLtVIdxdrjvqWR/sm93u/0eB6UcoO14YqObnw= +github.com/linxGnu/grocksdb v1.9.9/go.mod h1:C3CNe9UYc9hlEM2pC82AqiGS3LRW537u9LFV4wIZuHk= github.com/lufia/plan9stats v0.0.0-20250303091104-876f3ea5145d h1:fjMbDVUGsMQiVZnSQsmouYJvMdwsGiDipOZoN66v844= github.com/lufia/plan9stats v0.0.0-20250303091104-876f3ea5145d/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/tests/gocase/unit/sst/sst_load_test.go b/tests/gocase/unit/sst/sst_load_test.go new file mode 100644 index 000000000..cd0e264a6 --- /dev/null +++ b/tests/gocase/unit/sst/sst_load_test.go @@ -0,0 +1,609 @@ +//go:build !(ignore_when_tsan || ignore_when_asan || ignore_when_ubsan) + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package sst + +import ( + "context" + "encoding/binary" + "fmt" + "log" + "math/rand" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/linxGnu/grocksdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/kvrocks/tests/gocase/util" +) + +const ( + DefaultKvrocksNamespace = "__namespace" + metaDataEncodingMask uint8 = 0x82 + versionCounterBits = 11 +) + +type Metadata struct { + Flags uint8 + Expire uint64 + Version uint64 + Size uint64 +} + +type SSTResponse struct { + filesLoaded int64 `redis:"files_loaded"` +} + +func NewMetadata() *Metadata { + src := rand.NewSource(time.Now().UnixNano()) + r := rand.New(src) + + timestamp := uint64(time.Now().UnixMicro()) + counter := uint64(r.Int63()) + version := (timestamp << versionCounterBits) + (counter % (1 << versionCounterBits)) + + return &Metadata{ + Flags: metaDataEncodingMask, + Version: version, + } +} + +func (m *Metadata) Encode() []byte { + buf := make([]byte, 25) // 1 + 8 + 8 + 8 bytes + buf[0] = m.Flags + binary.BigEndian.PutUint64(buf[1:], m.Expire) + binary.BigEndian.PutUint64(buf[9:], m.Version) + binary.BigEndian.PutUint64(buf[17:], m.Size) + return buf +} + +func encodeInternalKey(namespace, key, field string, version uint64) []byte { + nsLen := len(namespace) + keyLen := len(key) + fieldLen := len(field) + + // Pre-calculate total size: 1 byte for ns size + ns + 4 bytes for key size + key + 8 bytes for version + field + out := make([]byte, 1+nsLen+4+keyLen+8+fieldLen) + + out[0] = uint8(nsLen) + copy(out[1:], namespace) + binary.BigEndian.PutUint32(out[1+nsLen:], uint32(keyLen)) + copy(out[5+nsLen:], key) + binary.BigEndian.PutUint64(out[5+nsLen+keyLen:], version) + copy(out[13+nsLen+keyLen:], field) + + return out +} + +func encodeRedisHashKey(namespace, userKey string) []byte { + totalLen := 1 + len(namespace) + len(userKey) + buf := make([]byte, totalLen) + buf[0] = uint8(len(namespace)) + copy(buf[1:], namespace) + copy(buf[1+len(namespace):], userKey) + return buf +} + +func createSSTFile(filename string, data map[string]string) error { + envOpts := grocksdb.NewDefaultEnvOptions() + sstWriterOpts := grocksdb.NewDefaultOptions() + sstWriterOpts.SetCompression(grocksdb.CompressionType(2)) + sstWriter := grocksdb.NewSSTFileWriter(envOpts, sstWriterOpts) + defer sstWriter.Destroy() + + err := sstWriter.Open(filename) + if err != nil { + return fmt.Errorf("failed to open SST file writer: %v", err) + } + + // Get all keys and sort them + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) + + // Add keys in sorted order + for _, k := range keys { + err = sstWriter.Add([]byte(k), []byte(data[k])) + if err != nil { + log.Printf("Error adding key %s to SST: %v", k, err) + } + } + + err = sstWriter.Finish() + if err != nil { + return fmt.Errorf("failed to finish SST file: %v", err) + } + return nil +} + +func makeTempDir() (string, error) { + return os.MkdirTemp("", "sst_test_*") +} + +func toInt64(val interface{}) (int64, error) { + switch v := val.(type) { + case int64: + return v, nil + case int: + return int64(v), nil + case float64: + return int64(v), nil + default: + return 0, fmt.Errorf("value is not a number, got %T", val) + } +} + +func ExtractSSTResponse(result interface{}) (*SSTResponse, error) { + resultMap, ok := result.(map[interface{}]interface{}) + if !ok { + return nil, fmt.Errorf("expected map[interface{}]interface{}, got %T", result) + } + response := &SSTResponse{} + for field, target := range map[string]*int64{ + "files_loaded": &response.filesLoaded, + } { + if val, ok := resultMap[field]; ok { + converted, err := toInt64(val) + if err != nil { + return nil, fmt.Errorf("%s: %v", field, err) + } + *target = converted + } + } + return response, nil +} + +func TestSSTLoad(t *testing.T) { + configOptions := []util.ConfigOptions{ + { + Name: "resp3-enabled", + Options: []string{"yes"}, + ConfigType: util.YesNo, + }, + } + configsMatrix, err := util.GenerateConfigsMatrix(configOptions) + require.NoError(t, err) + for _, configs := range configsMatrix { + testSSTLoad(t, configs) + } +} + +var testSSTLoad = func(t *testing.T, configs util.KvrocksServerConfigs) { + srv := util.StartServer(t, configs) + defer srv.Close() + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + t.Run("Test load cmd with no folder", func(t *testing.T) { + r := rdb.Do(ctx, "sst", "load") + assert.Error(t, r.Err()) + }) + + t.Run("Test wrong subcommand", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + r := rdb.Do(ctx, "sst", "wrong-sub-command", dir) + assert.Error(t, r.Err()) + }) + + t.Run("Test wrong load option", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + r := rdb.Do(ctx, "sst", "load", dir, "wrong-load-option") + assert.Error(t, r.Err()) + }) + + t.Run("Test empty folder", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + r := rdb.Do(ctx, "sst", "load", dir) + assert.NoError(t, r.Err()) + resp, err := ExtractSSTResponse(r.Val()) + assert.NoError(t, err) + assert.Equal(t, int64(0), resp.filesLoaded) + }) + + t.Run("Test load redis hash keys", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + + namespace := DefaultKvrocksNamespace + data := map[string][]map[string]string{ + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + } + keys := make(map[string]string, len(data)) + metaKeys := make(map[string]string, len(data)) + for hashK := range data { + meta := NewMetadata() + fields := data[hashK] + for _, field := range fields { + for fieldK, fieldV := range field { + internalKey := encodeInternalKey(namespace, hashK, fieldK, meta.Version) + keys[string(internalKey)] = fieldV + } + } + hashKey := encodeRedisHashKey(namespace, hashK) + meta.Size++ + metaKeys[string(hashKey)] = string(meta.Encode()) + } + + err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"), keys) + assert.NoError(t, err) + err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"), metaKeys) + assert.NoError(t, err) + + r := rdb.Do(ctx, "sst", "load", dir) + assert.NoError(t, r.Err()) + resp, err := ExtractSSTResponse(r.Val()) + assert.NoError(t, err) + assert.Equal(t, int64(2), resp.filesLoaded) + + //verify files didn't get moved + _, err = os.Stat(filepath.Join(dir, "kvrocks_default.sst")) + assert.NoError(t, err) + _, err = os.Stat(filepath.Join(dir, "kvrocks_metadata.sst")) + assert.NoError(t, err) + + for hashK, fields := range data { + for _, field := range fields { + for fieldK, expectedVal := range field { + val := rdb.HGet(ctx, hashK, fieldK) + assert.NoError(t, val.Err()) + assert.Equal(t, expectedVal, val.Val(), "Hash field value mismatch for key:%s field:%s", hashK, fieldK) + } + } + } + }) + + t.Run("Test load and update redis hash keys", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + + namespace := DefaultKvrocksNamespace + data := map[string][]map[string]string{ + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + } + keys := make(map[string]string, len(data)) + metaKeys := make(map[string]string, len(data)) + for hashK := range data { + meta := NewMetadata() + fields := data[hashK] + for _, field := range fields { + for fieldK, fieldV := range field { + internalKey := encodeInternalKey(namespace, hashK, fieldK, meta.Version) + keys[string(internalKey)] = fieldV + } + } + hashKey := encodeRedisHashKey(namespace, hashK) + meta.Size++ + metaKeys[string(hashKey)] = string(meta.Encode()) + } + + err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"), keys) + assert.NoError(t, err) + err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"), metaKeys) + assert.NoError(t, err) + + r := rdb.Do(ctx, "sst", "load", dir) + assert.NoError(t, r.Err()) + resp, err := ExtractSSTResponse(r.Val()) + assert.NoError(t, err) + assert.Equal(t, int64(2), resp.filesLoaded) + + for hashK, fields := range data { + for _, field := range fields { + for fieldK, expectedVal := range field { + val := rdb.HGet(ctx, hashK, fieldK) + assert.NoError(t, val.Err()) + assert.Equal(t, expectedVal, val.Val(), "Hash field value mismatch for key:%s field:%s", hashK, fieldK) + } + } + } + // update the keys fields via redis interface + for hashK := range data { + fields := data[hashK] + for _, field := range fields { + for fieldK := range field { + newVal := "__avoid_collisions__" + util.RandString(1, 10, util.Alpha) + r := rdb.HSet(ctx, hashK, fieldK, newVal) + assert.NoError(t, r.Err()) + field[fieldK] = newVal + } + } + } + // validate the updates made via redis interface + for hashK, fields := range data { + for _, field := range fields { + for fieldK, expectedVal := range field { + val := rdb.HGet(ctx, hashK, fieldK) + assert.NoError(t, val.Err()) + assert.Equal(t, expectedVal, val.Val(), "Hash field value mismatch for key:%s field:%s", hashK, fieldK) + } + } + } + // update the values and sst load + for hashK := range data { + fields := data[hashK] + for _, field := range fields { + for fieldK := range field { + newVal := "__avoid_collisions__" + util.RandString(1, 10, util.Alpha) + field[fieldK] = newVal + } + } + } + + keys = make(map[string]string, len(data)) + metaKeys = make(map[string]string, len(data)) + for hashK := range data { + meta := NewMetadata() + fields := data[hashK] + for _, field := range fields { + for fieldK, fieldV := range field { + internalKey := encodeInternalKey(namespace, hashK, fieldK, meta.Version) + keys[string(internalKey)] = fieldV + } + } + hashKey := encodeRedisHashKey(namespace, hashK) + meta.Size++ + metaKeys[string(hashKey)] = string(meta.Encode()) + } + + err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"), keys) + assert.NoError(t, err) + err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"), metaKeys) + assert.NoError(t, err) + + r = rdb.Do(ctx, "sst", "load", dir) + assert.NoError(t, r.Err()) + resp, err = ExtractSSTResponse(r.Val()) + assert.NoError(t, err) + assert.Equal(t, int64(2), resp.filesLoaded) + + for hashK, fields := range data { + for _, field := range fields { + for fieldK, expectedVal := range field { + val := rdb.HGet(ctx, hashK, fieldK) + assert.NoError(t, val.Err()) + assert.Equal(t, expectedVal, val.Val(), "Hash field value mismatch for key:%s field:%s", hashK, fieldK) + } + } + } + }) + + t.Run("Test load redis hash keys with move option", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + + namespace := DefaultKvrocksNamespace + data := map[string][]map[string]string{ + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + } + keys := make(map[string]string, len(data)) + metaKeys := make(map[string]string, len(data)) + for hashK := range data { + meta := NewMetadata() + fields := data[hashK] + for _, field := range fields { + for fieldK, fieldV := range field { + internalKey := encodeInternalKey(namespace, hashK, fieldK, meta.Version) + keys[string(internalKey)] = fieldV + } + } + hashKey := encodeRedisHashKey(namespace, hashK) + meta.Size++ + metaKeys[string(hashKey)] = string(meta.Encode()) + } + + err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"), keys) + assert.NoError(t, err) + err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"), metaKeys) + assert.NoError(t, err) + + r := rdb.Do(ctx, "sst", "load", dir, "movefiles", "yes") + assert.NoError(t, r.Err()) + resp, err := ExtractSSTResponse(r.Val()) + assert.NoError(t, err) + assert.Equal(t, int64(2), resp.filesLoaded) + + //verify files did get moved + _, err = os.Stat(filepath.Join(dir, "kvrocks_default.sst")) + assert.True(t, os.IsNotExist(err)) + _, err = os.Stat(filepath.Join(dir, "kvrocks_metadata.sst")) + assert.True(t, os.IsNotExist(err)) + + for hashK, fields := range data { + for _, field := range fields { + for fieldK, expectedVal := range field { + val := rdb.HGet(ctx, hashK, fieldK) + assert.NoError(t, val.Err()) + assert.Equal(t, expectedVal, val.Val(), "Hash field value mismatch for key:%s field:%s", hashK, fieldK) + } + } + } + }) + + t.Run("Test load redis hash keys with no metadata entries", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + + namespace := DefaultKvrocksNamespace + data := map[string][]map[string]string{ + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + } + keys := make(map[string]string, len(data)) + metaKeys := make(map[string]string, len(data)) + for hashK := range data { + meta := NewMetadata() + fields := data[hashK] + for _, field := range fields { + for fieldK, fieldV := range field { + internalKey := encodeInternalKey(namespace, hashK, fieldK, meta.Version) + keys[string(internalKey)] = fieldV + } + } + hashKey := encodeRedisHashKey(namespace, hashK) + meta.Size++ + metaKeys[string(hashKey)] = string(meta.Encode()) + } + + err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"), keys) + assert.NoError(t, err) + + r := rdb.Do(ctx, "sst", "load", dir) + assert.NoError(t, r.Err()) + resp, err := ExtractSSTResponse(r.Val()) + assert.NoError(t, err) + assert.Equal(t, int64(1), resp.filesLoaded) + + for hashK, fields := range data { + for _, field := range fields { + for fieldK := range field { + val := rdb.HGet(ctx, hashK, fieldK) + assert.Error(t, val.Err()) + } + } + } + }) + + t.Run("Test load redis hash keys with expiration", func(t *testing.T) { + dir, err := makeTempDir() + require.NoError(t, err) + defer os.RemoveAll(dir) + + namespace := DefaultKvrocksNamespace + data := map[string][]map[string]string{ + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + "__avoid_collisions__" + util.RandString(1, 10, util.Alpha): { + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + {"__avoid_collisions__" + util.RandString(1, 10, util.Alpha): "__avoid_collisions__" + util.RandString(1, 10, util.Alpha)}, + }, + } + keys := make(map[string]string, len(data)) + metaKeys := make(map[string]string, len(data)) + for hashK := range data { + meta := NewMetadata() + meta.Expire = uint64(time.Now().Add(5 * time.Second).UnixMilli()) + fields := data[hashK] + for _, field := range fields { + for fieldK, fieldV := range field { + internalKey := encodeInternalKey(namespace, hashK, fieldK, meta.Version) + keys[string(internalKey)] = fieldV + } + } + hashKey := encodeRedisHashKey(namespace, hashK) + meta.Size++ + metaKeys[string(hashKey)] = string(meta.Encode()) + } + + err = createSSTFile(filepath.Join(dir, "kvrocks_default.sst"), keys) + assert.NoError(t, err) + err = createSSTFile(filepath.Join(dir, "kvrocks_metadata.sst"), metaKeys) + assert.NoError(t, err) + + r := rdb.Do(ctx, "sst", "load", dir) + assert.NoError(t, r.Err()) + resp, err := ExtractSSTResponse(r.Val()) + assert.NoError(t, err) + assert.Equal(t, int64(2), resp.filesLoaded) + + // verify keys have expiration + for hashK := range data { + expireDuration := rdb.ExpireTime(ctx, hashK) + assert.NotEmpty(t, expireDuration.Val().Milliseconds()) + } + //verify keys have expired + time.Sleep(5 * time.Second) + for hashK, fields := range data { + for _, field := range fields { + for fieldK := range field { + val := rdb.HGet(ctx, hashK, fieldK) + assert.Error(t, val.Err()) + } + } + } + }) +} diff --git a/x.py b/x.py index deb3b4523..5ab66c07c 100755 --- a/x.py +++ b/x.py @@ -223,6 +223,60 @@ def clang_tidy(dir: str, jobs: Optional[int], clang_tidy_path: str, run_clang_ti run(run_command, *options, *regexes, verbose=True, cwd=basedir) +def is_rocky_linux(): + try: + with open('/etc/os-release') as f: + lines = f.readlines() + data = {} + for line in lines: + if '=' in line: + key, value = line.strip().split('=', 1) + data[key] = value.strip('"') + + is_rocky = data.get('ID') == 'rocky' + version_id = data.get('VERSION_ID', '').split('.')[0] + version_match = version_id in ('8', '9') + + return is_rocky and version_match + except FileNotFoundError: + print("File /etc/os-release not found.") + return False + except Exception as e: + print(f"Unexpected error: {e}") + return False + + +def get_custom_env(): + rocksdb = Path(__file__).parent.absolute().joinpath('build/_deps/rocksdb-src/include') + rocksdb_lib = Path(__file__).parent.absolute().joinpath('build/_deps/rocksdb-build') + zlib = Path(__file__).parent.absolute().joinpath('build/_deps/zstd-src/lib') + z4lib = Path(__file__).parent.absolute().joinpath('build/_deps/lz4-src/lib') + snappy_lib = Path(__file__).parent.absolute().joinpath('build/_deps/snappy-build') + + additional_flags = "" + libstdc_folder = "" + env = os.environ.copy() + + if is_rocky_linux(): + output = run_pipe("find", "/opt/rh/gcc-toolset-12/root/", "-name", "libstdc++.so*") + output = run_pipe("grep", "-v", "32",stdin=output) + output = run_pipe("xargs", "dirname", stdin=output) + libstdc_folder = output.read().strip() + "/" + + if libstdc_folder != "": + additional_flags = f"-L{libstdc_folder} -lstdc++" + + with_cov = env.get("WITH_COVERAGE","false") + if with_cov == "true": + additional_flags += "-lgcov" + + env.update({ + "CGO_CFLAGS": f"-I{rocksdb}", + "CGO_LDFLAGS": f"-L{rocksdb_lib} -L{zlib} -L{z4lib} -L{snappy_lib} {additional_flags}", + }) + return env + + def golangci_lint(golangci_lint_path: str) -> None: def get_gopath() -> Tuple[Path, Path]: go = find_command('go', msg='go is required for testing') @@ -258,7 +312,7 @@ def golangci_lint(golangci_lint_path: str) -> None: check_version(version_str, GOLANGCI_LINT_REQUIRED_VERSION, "golangci-lint") basedir = Path(__file__).parent.absolute() / 'tests' / 'gocase' - run(binpath_str, 'run', '-v', './...', cwd=str(basedir), verbose=True) + run(binpath_str, 'run', '-v', './...', cwd=str(basedir), verbose=True, env=get_custom_env()) def write_version(release_version: str) -> str: @@ -322,7 +376,7 @@ def test_go(dir: str, cli_path: str, rest: List[str]) -> None: *rest ] - run(go, *args, cwd=str(basedir), verbose=True) + run(go, *args, cwd=str(basedir), verbose=True, env=get_custom_env()) if __name__ == '__main__':