git-hulk commented on code in PR #3269:
URL: https://github.com/apache/kvrocks/pull/3269#discussion_r2600785585


##########
src/storage/storage.cc:
##########
@@ -338,6 +339,7 @@ Status Storage::Open(DBOpenMode mode) {
   subkey_opts.disable_auto_compactions = 
config_->rocks_db.disable_auto_compactions;
   subkey_opts.table_properties_collector_factories.emplace_back(
       
NewCompactOnExpiredTableCollectorFactory(std::string(kPrimarySubkeyColumnFamilyName),
 0.3));
+  subkey_opts.merge_operator = std::make_shared<HashMergeOperator>();

Review Comment:
   The merge operator would be invoked when iterating over any keys in this 
column family, and it could significantly affect performance.



##########
src/types/redis_hash.cc:
##########
@@ -428,4 +516,200 @@ rocksdb::Status Hash::RandField(engine::Context &ctx, 
const Slice &user_key, int
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice 
&user_key, uint64_t expire_ms,
+                                   const std::vector<Slice> &fields, 
std::vector<FieldExpireResult> *results) {
+  results->clear();
+  results->reserve(fields.size());
+
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  HashMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
+  if (!s.ok()) {
+    // Key doesn't exist - all fields don't exist
+    results->assign(fields.size(), FieldExpireResult::kFieldNotFound);
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisHash);
+  s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  for (const auto &field : fields) {
+    std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+    std::string raw_value;
+    s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &raw_value);
+
+    if (s.IsNotFound()) {
+      results->push_back(FieldExpireResult::kFieldNotFound);  // Field doesn't 
exist
+      continue;
+    }
+    if (!s.ok()) return s;
+
+    // Decode existing value
+    HashFieldValue field_value;
+    if (!HashFieldValue::Decode(raw_value, &field_value)) {
+      return rocksdb::Status::Corruption("failed to decode hash field value");
+    }
+
+    // Check if field is already expired
+    if (field_value.IsExpired()) {
+      results->push_back(FieldExpireResult::kFieldNotFound);  // Treat as 
non-existent
+      continue;
+    }
+
+    // Set new expiration - create a new HashFieldValue with the value and new 
expiration
+    HashFieldValue new_field_value(field_value.value, expire_ms);
+    std::string encoded_value;
+    new_field_value.Encode(&encoded_value);
+    s = batch->Put(sub_key, encoded_value);
+    if (!s.ok()) return s;
+    results->push_back(FieldExpireResult::kExpireSet);  // Expiration set 
successfully
+  }
+
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status Hash::TTLFields(engine::Context &ctx, const Slice &user_key, 
const std::vector<Slice> &fields,
+                                std::vector<int64_t> *results) {
+  results->clear();
+  results->reserve(fields.size());
+
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  HashMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
+  if (!s.ok()) {
+    // Key doesn't exist - all fields don't exist
+    results->assign(fields.size(), -2);
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  for (const auto &field : fields) {
+    std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+    std::string raw_value;
+    s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &raw_value);
+
+    if (s.IsNotFound()) {
+      results->push_back(-2);  // Field doesn't exist
+      continue;
+    }
+    if (!s.ok()) return s;
+
+    // Decode existing value
+    HashFieldValue field_value;
+    if (!HashFieldValue::Decode(raw_value, &field_value)) {
+      return rocksdb::Status::Corruption("failed to decode hash field value");
+    }
+
+    // Check if field is already expired
+    if (field_value.IsExpired()) {
+      results->push_back(-2);  // Treat as non-existent
+      continue;
+    }
+
+    // Return TTL
+    results->push_back(field_value.TTL());
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice 
&user_key, const std::vector<Slice> &fields,
+                                    std::vector<FieldPersistResult> *results) {
+  results->clear();
+  results->reserve(fields.size());
+
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  HashMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
+  if (!s.ok()) {
+    // Key doesn't exist - all fields don't exist
+    results->assign(fields.size(), FieldPersistResult::kFieldNotFound);
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisHash);
+  s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  bool has_updates = false;
+  for (const auto &field : fields) {
+    std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+    std::string raw_value;
+    s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &raw_value);
+
+    if (s.IsNotFound()) {
+      results->push_back(FieldPersistResult::kFieldNotFound);  // Field 
doesn't exist
+      continue;
+    }
+    if (!s.ok()) return s;
+
+    // Decode existing value
+    HashFieldValue field_value;
+    if (!HashFieldValue::Decode(raw_value, &field_value)) {
+      return rocksdb::Status::Corruption("failed to decode hash field value");
+    }
+
+    // Check if field is already expired
+    if (field_value.IsExpired()) {
+      results->push_back(FieldPersistResult::kFieldNotFound);  // Treat as 
non-existent
+      continue;
+    }
+
+    // Check if field has expiration
+    if (field_value.expire == 0) {
+      results->push_back(FieldPersistResult::kNotVolatile);  // Field exists 
but has no TTL
+      continue;
+    }
+
+    // Remove expiration - create a new HashFieldValue with the value and no 
expiration
+    HashFieldValue new_field_value(field_value.value, 0);
+    std::string encoded_value;
+    new_field_value.Encode(&encoded_value);
+    s = batch->Put(sub_key, encoded_value);
+    if (!s.ok()) return s;
+    has_updates = true;
+    results->push_back(FieldPersistResult::kPersisted);  // Expiration removed 
successfully
+  }
+
+  if (has_updates) {
+    return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  }
+  return rocksdb::Status::OK();
+}
+
+void Hash::asyncRepairHash(const std::string &ns_key, const Slice &field, 
const HashMetadata &metadata) const {

Review Comment:
   It's not a good idea to spawn a thread per expired hash field. Perhaps we 
can reuse the existing task runner, or have a fixed thread pool for this 
purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to