This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 51569adc feat: add `iter->status()` check for loop iterators (#2395)
51569adc is described below
commit 51569adcd23e196fc2dabb425c1fc6375d0f3b70
Author: Edward Xu <[email protected]>
AuthorDate: Sun Jul 7 19:50:05 2024 +0800
feat: add `iter->status()` check for loop iterators (#2395)
---
src/cluster/slot_migrate.cc | 16 ++++++++++++++++
src/search/index_manager.h | 4 ++++
src/search/indexer.cc | 4 ++++
src/storage/redis_db.cc | 11 ++++++++++-
src/storage/scripting.cc | 8 ++++++++
src/types/redis_stream.cc | 29 +++++++++++++++++++++++++++--
6 files changed, 69 insertions(+), 3 deletions(-)
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index a074d536..03625dc6 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -382,6 +382,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
}
}
+ if (auto s = iter->status(); !s.ok()) {
+ auto err_str = s.ToString();
+ LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": "
<< err_str;
+ return {Status::NotOK, fmt::format("failed to iterate keys of slot {}:
{}", slot, err_str)};
+ }
+
// It's necessary to send commands that are still in the pipeline since the
final pipeline may not be sent
// while iterating keys because its size could be less than
max_pipeline_size_
auto s = sendCmdsPipelineIfNeed(&restore_cmds, true);
@@ -820,6 +826,11 @@ Status SlotMigrator::migrateComplexKey(const
rocksdb::Slice &key, const Metadata
}
}
+ if (auto s = iter->status(); !s.ok()) {
+ return {Status::NotOK,
+ fmt::format("failed to iterate values of the complex key {}: {}",
key.ToString(), s.ToString())};
+ }
+
// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
@@ -880,6 +891,11 @@ Status SlotMigrator::migrateStream(const Slice &key, const
StreamMetadata &metad
}
}
+ if (auto s = iter->status(); !s.ok()) {
+ return {Status::NotOK,
+ fmt::format("failed to iterate values of the stream key {}: {}",
key.ToString(), s.ToString())};
+ }
+
// commands like XTRIM and XDEL affect stream's metadata, but we use only
XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according
to the current values on the source
*restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(),
metadata.last_generated_id.ToString(),
diff --git a/src/search/index_manager.h b/src/search/index_manager.h
index 1d744704..0f90961f 100644
--- a/src/search/index_manager.h
+++ b/src/search/index_manager.h
@@ -129,6 +129,10 @@ struct IndexManager {
index_map.Insert(std::move(info));
}
+ if (auto s = iter->status(); !s.ok()) {
+ return {Status::NotOK, fmt::format("fail to load index metadata: {}",
s.ToString())};
+ }
+
return Status::OK();
}
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 7ce0b3d0..1212dd2f 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -334,6 +334,10 @@ Status IndexUpdater::Build() const {
if (s.Is<Status::TypeMismatched>()) continue;
if (!s.OK()) return s;
}
+
+ if (auto s = iter->status(); !s.ok()) {
+ return {Status::NotOK, s.ToString()};
+ }
}
return Status::OK();
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 4f08490b..0fd31137 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -299,6 +299,10 @@ rocksdb::Status Database::Keys(const std::string &prefix,
std::vector<std::strin
}
}
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
+
if (!storage_->IsSlotIdEncoded()) break;
if (prefix.empty()) break;
if (++slot_id >= HASH_SLOTS_SIZE) break;
@@ -368,6 +372,11 @@ rocksdb::Status Database::Scan(const std::string &cursor,
uint64_t limit, const
keys->emplace_back(user_key);
cnt++;
}
+
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
+
if (!storage_->IsSlotIdEncoded() || prefix.empty()) {
if (!keys->empty() && cnt >= limit) {
end_cursor->append(user_key);
@@ -587,7 +596,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const
Slice &user_key, const
break;
}
}
- return rocksdb::Status::OK();
+ return iter->status();
}
RedisType WriteBatchLogData::GetRedisType() const { return type_; }
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 5197eb5c..987f9f0d 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -452,6 +452,10 @@ Status FunctionList(Server *srv, const redis::Connection
*conn, const std::strin
result.emplace_back(lib.ToString(), iter->value().ToString());
}
+ if (auto s = iter->status(); !s.ok()) {
+ return {Status::NotOK, s.ToString()};
+ }
+
output->append(redis::MultiLen(result.size()));
for (const auto &[lib, code] : result) {
output->append(conn->HeaderOfMap(with_code ? 2 : 1));
@@ -488,6 +492,10 @@ Status FunctionListFunc(Server *srv, const
redis::Connection *conn, const std::s
result.emplace_back(func.ToString(), iter->value().ToString());
}
+ if (auto s = iter->status(); !s.ok()) {
+ return {Status::NotOK, s.ToString()};
+ }
+
output->append(redis::MultiLen(result.size()));
for (const auto &[func, lib] : result) {
output->append(conn->HeaderOfMap(2));
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 6bf03d34..97fcf948 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -654,6 +654,10 @@ rocksdb::Status Stream::AutoClaim(const Slice
&stream_name, const std::string &g
}
}
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
+
if (has_next_entry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(),
tmp_group_name);
@@ -763,6 +767,10 @@ rocksdb::Status Stream::DestroyGroup(const Slice
&stream_name, const std::string
*delete_cnt += 1;
}
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
+
if (*delete_cnt != 0) {
metadata.group_number -= 1;
std::string metadata_bytes;
@@ -895,6 +903,11 @@ rocksdb::Status Stream::DestroyConsumer(const Slice
&stream_name, const std::str
}
}
}
+
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
+
batch->Delete(stream_cf_handle_, consumer_key);
StreamConsumerGroupMetadata group_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
group_metadata.consumer_number -= 1;
@@ -1101,6 +1114,10 @@ rocksdb::Status Stream::Len(const Slice &stream_name,
const StreamLenOptions &op
}
}
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
+
return rocksdb::Status::OK();
}
@@ -1178,6 +1195,10 @@ rocksdb::Status Stream::range(const std::string &ns_key,
const StreamMetadata &m
}
}
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
+
return rocksdb::Status::OK();
}
@@ -1341,7 +1362,7 @@ rocksdb::Status Stream::GetGroupInfo(const Slice
&stream_name,
group_metadata.push_back(tmp_item);
}
}
- return rocksdb::Status::OK();
+ return iter->status();
}
rocksdb::Status Stream::GetConsumerInfo(
@@ -1375,7 +1396,7 @@ rocksdb::Status Stream::GetConsumerInfo(
consumer_metadata.push_back(tmp_item);
}
}
- return rocksdb::Status::OK();
+ return iter->status();
}
rocksdb::Status Stream::Range(const Slice &stream_name, const
StreamRangeOptions &options,
@@ -1539,6 +1560,10 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
if (count >= options.count) break;
}
}
+
+ if (auto s = iter->status(); !s.ok()) {
+ return s;
+ }
}
batch->Put(stream_cf_handle_, group_key,
encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
batch->Put(stream_cf_handle_, consumer_key,
encodeStreamConsumerMetadataValue(consumer_metadata));