This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 51569adc feat: add `iter->status()` check for loop iterators (#2395)
51569adc is described below

commit 51569adcd23e196fc2dabb425c1fc6375d0f3b70
Author: Edward Xu <[email protected]>
AuthorDate: Sun Jul 7 19:50:05 2024 +0800

    feat: add `iter->status()` check for loop iterators (#2395)
---
 src/cluster/slot_migrate.cc | 16 ++++++++++++++++
 src/search/index_manager.h  |  4 ++++
 src/search/indexer.cc       |  4 ++++
 src/storage/redis_db.cc     | 11 ++++++++++-
 src/storage/scripting.cc    |  8 ++++++++
 src/types/redis_stream.cc   | 29 +++++++++++++++++++++++++++--
 6 files changed, 69 insertions(+), 3 deletions(-)

diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index a074d536..03625dc6 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -382,6 +382,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
     }
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    auto err_str = s.ToString();
+    LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": " 
<< err_str;
+    return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: 
{}", slot, err_str)};
+  }
+
   // It's necessary to send commands that are still in the pipeline since the 
final pipeline may not be sent
   // while iterating keys because its size could be less than 
max_pipeline_size_
   auto s = sendCmdsPipelineIfNeed(&restore_cmds, true);
@@ -820,6 +826,11 @@ Status SlotMigrator::migrateComplexKey(const 
rocksdb::Slice &key, const Metadata
     }
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return {Status::NotOK,
+            fmt::format("failed to iterate values of the complex key {}: {}", 
key.ToString(), s.ToString())};
+  }
+
   // Have to check the item count of the last command list
   if (item_count % kMaxItemsInCommand != 0) {
     *restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
@@ -880,6 +891,11 @@ Status SlotMigrator::migrateStream(const Slice &key, const 
StreamMetadata &metad
     }
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return {Status::NotOK,
+            fmt::format("failed to iterate values of the stream key {}: {}", 
key.ToString(), s.ToString())};
+  }
+
   // commands like XTRIM and XDEL affect stream's metadata, but we use only 
XADD for a slot migration
   // XSETID is used to adjust stream's info on the destination node according 
to the current values on the source
   *restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), 
metadata.last_generated_id.ToString(),
diff --git a/src/search/index_manager.h b/src/search/index_manager.h
index 1d744704..0f90961f 100644
--- a/src/search/index_manager.h
+++ b/src/search/index_manager.h
@@ -129,6 +129,10 @@ struct IndexManager {
       index_map.Insert(std::move(info));
     }
 
+    if (auto s = iter->status(); !s.ok()) {
+      return {Status::NotOK, fmt::format("fail to load index metadata: {}", 
s.ToString())};
+    }
+
     return Status::OK();
   }
 
diff --git a/src/search/indexer.cc b/src/search/indexer.cc
index 7ce0b3d0..1212dd2f 100644
--- a/src/search/indexer.cc
+++ b/src/search/indexer.cc
@@ -334,6 +334,10 @@ Status IndexUpdater::Build() const {
       if (s.Is<Status::TypeMismatched>()) continue;
       if (!s.OK()) return s;
     }
+
+    if (auto s = iter->status(); !s.ok()) {
+      return {Status::NotOK, s.ToString()};
+    }
   }
 
   return Status::OK();
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 4f08490b..0fd31137 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -299,6 +299,10 @@ rocksdb::Status Database::Keys(const std::string &prefix, 
std::vector<std::strin
       }
     }
 
+    if (auto s = iter->status(); !s.ok()) {
+      return s;
+    }
+
     if (!storage_->IsSlotIdEncoded()) break;
     if (prefix.empty()) break;
     if (++slot_id >= HASH_SLOTS_SIZE) break;
@@ -368,6 +372,11 @@ rocksdb::Status Database::Scan(const std::string &cursor, 
uint64_t limit, const
       keys->emplace_back(user_key);
       cnt++;
     }
+
+    if (auto s = iter->status(); !s.ok()) {
+      return s;
+    }
+
     if (!storage_->IsSlotIdEncoded() || prefix.empty()) {
       if (!keys->empty() && cnt >= limit) {
         end_cursor->append(user_key);
@@ -587,7 +596,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const 
Slice &user_key, const
       break;
     }
   }
-  return rocksdb::Status::OK();
+  return iter->status();
 }
 
 RedisType WriteBatchLogData::GetRedisType() const { return type_; }
diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc
index 5197eb5c..987f9f0d 100644
--- a/src/storage/scripting.cc
+++ b/src/storage/scripting.cc
@@ -452,6 +452,10 @@ Status FunctionList(Server *srv, const redis::Connection 
*conn, const std::strin
     result.emplace_back(lib.ToString(), iter->value().ToString());
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return {Status::NotOK, s.ToString()};
+  }
+
   output->append(redis::MultiLen(result.size()));
   for (const auto &[lib, code] : result) {
     output->append(conn->HeaderOfMap(with_code ? 2 : 1));
@@ -488,6 +492,10 @@ Status FunctionListFunc(Server *srv, const 
redis::Connection *conn, const std::s
     result.emplace_back(func.ToString(), iter->value().ToString());
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return {Status::NotOK, s.ToString()};
+  }
+
   output->append(redis::MultiLen(result.size()));
   for (const auto &[func, lib] : result) {
     output->append(conn->HeaderOfMap(2));
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 6bf03d34..97fcf948 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -654,6 +654,10 @@ rocksdb::Status Stream::AutoClaim(const Slice 
&stream_name, const std::string &g
     }
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return s;
+  }
+
   if (has_next_entry) {
     std::string tmp_group_name;
     StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), 
tmp_group_name);
@@ -763,6 +767,10 @@ rocksdb::Status Stream::DestroyGroup(const Slice 
&stream_name, const std::string
     *delete_cnt += 1;
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return s;
+  }
+
   if (*delete_cnt != 0) {
     metadata.group_number -= 1;
     std::string metadata_bytes;
@@ -895,6 +903,11 @@ rocksdb::Status Stream::DestroyConsumer(const Slice 
&stream_name, const std::str
       }
     }
   }
+
+  if (auto s = iter->status(); !s.ok()) {
+    return s;
+  }
+
   batch->Delete(stream_cf_handle_, consumer_key);
   StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
   group_metadata.consumer_number -= 1;
@@ -1101,6 +1114,10 @@ rocksdb::Status Stream::Len(const Slice &stream_name, 
const StreamLenOptions &op
     }
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return s;
+  }
+
   return rocksdb::Status::OK();
 }
 
@@ -1178,6 +1195,10 @@ rocksdb::Status Stream::range(const std::string &ns_key, 
const StreamMetadata &m
     }
   }
 
+  if (auto s = iter->status(); !s.ok()) {
+    return s;
+  }
+
   return rocksdb::Status::OK();
 }
 
@@ -1341,7 +1362,7 @@ rocksdb::Status Stream::GetGroupInfo(const Slice 
&stream_name,
       group_metadata.push_back(tmp_item);
     }
   }
-  return rocksdb::Status::OK();
+  return iter->status();
 }
 
 rocksdb::Status Stream::GetConsumerInfo(
@@ -1375,7 +1396,7 @@ rocksdb::Status Stream::GetConsumerInfo(
       consumer_metadata.push_back(tmp_item);
     }
   }
-  return rocksdb::Status::OK();
+  return iter->status();
 }
 
 rocksdb::Status Stream::Range(const Slice &stream_name, const 
StreamRangeOptions &options,
@@ -1539,6 +1560,10 @@ rocksdb::Status Stream::RangeWithPending(const Slice 
&stream_name, StreamRangeOp
         if (count >= options.count) break;
       }
     }
+
+    if (auto s = iter->status(); !s.ok()) {
+      return s;
+    }
   }
   batch->Put(stream_cf_handle_, group_key, 
encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
   batch->Put(stream_cf_handle_, consumer_key, 
encodeStreamConsumerMetadataValue(consumer_metadata));

Reply via email to