mapleFU commented on code in PR #2332: URL: https://github.com/apache/kvrocks/pull/2332#discussion_r1713169212
########## src/storage/batch_indexer.h: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include <rocksdb/db.h> +#include <rocksdb/slice.h> +#include <rocksdb/utilities/write_batch_with_index.h> + +#include <string> +#include <vector> + +#include "storage.h" + +// WriteBatchIndexer traverses the operations in WriteBatch and append to the specified WriteBatchWithIndex Review Comment: ```suggestion // WriteBatchIndexer traverses the operations in WriteBatch and appends to the specified WriteBatchWithIndex ``` ########## src/storage/batch_indexer.h: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include <rocksdb/db.h> +#include <rocksdb/slice.h> +#include <rocksdb/utilities/write_batch_with_index.h> + +#include <string> +#include <vector> + +#include "storage.h" + +// WriteBatchIndexer traverses the operations in WriteBatch and append to the specified WriteBatchWithIndex +class WriteBatchIndexer : public rocksdb::WriteBatch::Handler { + public: + explicit WriteBatchIndexer(engine::Storage* storage, rocksdb::WriteBatchWithIndex* dest_batch) + : storage_(storage), dest_batch_(dest_batch) {} + rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override { + return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value); + } + + void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) override { dest_batch_->Put(key, value); } + + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return dest_batch_->Delete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key); + } + + void Delete(const rocksdb::Slice& key) override { dest_batch_->Delete(key); } + + rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return dest_batch_->SingleDelete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key); + } + + void SingleDelete(const rocksdb::Slice& key) override { dest_batch_->SingleDelete(key); } + + rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key, + const rocksdb::Slice& end_key) override { + // The latest snapshot (default ReadOptions) needs to be used here. + // If an old snapshot is used here and writebatchwithindex is still being built, + // the newly added keys after the snapshot may not be obtained. + rocksdb::ReadOptions read_options; + auto cf_handle = storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)); + std::unique_ptr<rocksdb::Iterator> it(storage_->GetDB()->NewIterator(read_options, cf_handle)); + for (it->Seek(begin_key); it->Valid() && it->key().compare(end_key) < 0; it->Next()) { + auto s = dest_batch_->Delete(cf_handle, it->key()); + if (!s.ok()) { + return s; + } + } + return rocksdb::Status::OK(); + } + + rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override { + return dest_batch_->Merge(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value); + } + + void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) override { dest_batch_->Merge(key, value); } + + void LogData(const rocksdb::Slice& blob) override { dest_batch_->PutLogData(blob); } + + private: + engine::Storage* storage_; + rocksdb::WriteBatchWithIndex* dest_batch_; +}; Review Comment: nit: newline after this ########## src/storage/storage.h: ########## @@ -361,8 +368,75 @@ class Storage { rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions(); - rocksdb::Status writeToDB(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); + rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s); }; +/// Context passes fixed snapshot and batche between APIs +/// +/// Limitations: Performing a large number of writes on the same Context may reduce performance. +/// Please choose to use the same Context or create a new Context based on the actual situation. +/// +/// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs. +struct Context { + engine::Storage *storage = nullptr; + /// Snapshot should be specified instead of nullptr when used, Review Comment: I guess this is conflict with `NonTransactionContext` ########## src/storage/storage.cc: ########## @@ -636,21 +648,31 @@ void Storage::recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_famil } } -rocksdb::Iterator *Storage::NewIterator(const rocksdb::ReadOptions &options, +rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family) { + if (ctx.is_txn_mode) { + DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); + } auto iter = db_->NewIterator(options, column_family); if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) { return txn_write_batch_->NewIteratorWithBase(column_family, iter, &options); + } else if (ctx.is_txn_mode && ctx.batch && ctx.batch->GetWriteBatch()->Count() > 0) { + return ctx.batch->NewIteratorWithBase(column_family, iter, &options); } return iter; } -void Storage::MultiGet(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family, - const size_t num_keys, const rocksdb::Slice *keys, rocksdb::PinnableSlice *values, - rocksdb::Status *statuses) { +void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options, + rocksdb::ColumnFamilyHandle *column_family, const size_t num_keys, const rocksdb::Slice *keys, + rocksdb::PinnableSlice *values, rocksdb::Status *statuses) { + if (ctx.is_txn_mode) { + DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber()); Review Comment: We'd better check `options.snapshot != nullptr` before checking equality ########## src/storage/batch_indexer.h: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include <rocksdb/db.h> +#include <rocksdb/slice.h> +#include <rocksdb/utilities/write_batch_with_index.h> + +#include <string> +#include <vector> + +#include "storage.h" + +// WriteBatchIndexer traverses the operations in WriteBatch and append to the specified WriteBatchWithIndex +class WriteBatchIndexer : public rocksdb::WriteBatch::Handler { + public: + explicit WriteBatchIndexer(engine::Storage* storage, rocksdb::WriteBatchWithIndex* dest_batch) + : storage_(storage), dest_batch_(dest_batch) {} Review Comment: adding `DCHECK` the `storage_` and `dest_batch_` is not nullptr here? ########## src/storage/batch_indexer.h: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include <rocksdb/db.h> +#include <rocksdb/slice.h> +#include <rocksdb/utilities/write_batch_with_index.h> + +#include <string> +#include <vector> + +#include "storage.h" + +// WriteBatchIndexer traverses the operations in WriteBatch and append to the specified WriteBatchWithIndex +class WriteBatchIndexer : public rocksdb::WriteBatch::Handler { + public: + explicit WriteBatchIndexer(engine::Storage* storage, rocksdb::WriteBatchWithIndex* dest_batch) + : storage_(storage), dest_batch_(dest_batch) {} + rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override { + return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value); + } + + void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) override { dest_batch_->Put(key, value); } + + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return dest_batch_->Delete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key); + } + + void Delete(const rocksdb::Slice& key) override { dest_batch_->Delete(key); } + + rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return dest_batch_->SingleDelete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key); + } + + void SingleDelete(const rocksdb::Slice& key) override { dest_batch_->SingleDelete(key); } + + rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key, + const rocksdb::Slice& end_key) override { + // The latest snapshot (default ReadOptions) needs to be used here. + // If an old snapshot is used here and writebatchwithindex is still being built, + // the newly added keys after the snapshot may not be obtained. + rocksdb::ReadOptions read_options; + auto cf_handle = storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)); + std::unique_ptr<rocksdb::Iterator> it(storage_->GetDB()->NewIterator(read_options, cf_handle)); Review Comment: Better setting a lower_bound and upper_bound, this would help skipping iterating some key-values ########## src/storage/batch_indexer.h: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include <rocksdb/db.h> +#include <rocksdb/slice.h> +#include <rocksdb/utilities/write_batch_with_index.h> + +#include <string> +#include <vector> + +#include "storage.h" + +// WriteBatchIndexer traverses the operations in WriteBatch and append to the specified WriteBatchWithIndex +class WriteBatchIndexer : public rocksdb::WriteBatch::Handler { + public: + explicit WriteBatchIndexer(engine::Storage* storage, rocksdb::WriteBatchWithIndex* dest_batch) + : storage_(storage), dest_batch_(dest_batch) {} + rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override { + return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value); + } + + void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) override { dest_batch_->Put(key, value); } + + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return dest_batch_->Delete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key); + } + + void Delete(const rocksdb::Slice& key) override { dest_batch_->Delete(key); } + + rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return dest_batch_->SingleDelete(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key); + } + + void SingleDelete(const rocksdb::Slice& key) override { dest_batch_->SingleDelete(key); } + + rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key, + const rocksdb::Slice& end_key) override { + // The latest snapshot (default ReadOptions) needs to be used here. + // If an old snapshot is used here and writebatchwithindex is still being built, Review Comment: "and writebatchwithindex is still being built" What does this means here? ########## src/storage/storage.h: ########## @@ -361,8 +368,75 @@ class Storage { rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions(); - rocksdb::Status writeToDB(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); + rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s); }; +/// Context passes fixed snapshot and batche between APIs +/// +/// Limitations: Performing a large number of writes on the same Context may reduce performance. +/// Please choose to use the same Context or create a new Context based on the actual situation. +/// +/// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs. +struct Context { + engine::Storage *storage = nullptr; + /// Snapshot should be specified instead of nullptr when used, + /// and should be consistent with snapshot in ReadOptions to avoid ambiguity. + /// Normally it will be fixed to the latest Snapshot when the Context is constructed + const rocksdb::Snapshot *snapshot = nullptr; + std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr; + + bool is_txn_mode = true; Review Comment: Nit: we can adding some comments for `is_txn_mode` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
