git-hulk commented on code in PR #3269:
URL: https://github.com/apache/kvrocks/pull/3269#discussion_r2600785585
##########
src/storage/storage.cc:
##########
@@ -338,6 +339,7 @@ Status Storage::Open(DBOpenMode mode) {
subkey_opts.disable_auto_compactions =
config_->rocks_db.disable_auto_compactions;
subkey_opts.table_properties_collector_factories.emplace_back(
NewCompactOnExpiredTableCollectorFactory(std::string(kPrimarySubkeyColumnFamilyName),
0.3));
+ subkey_opts.merge_operator = std::make_shared<HashMergeOperator>();
Review Comment:
The merge operator would be invoked when iterating over any keys in this
column family, and it could significantly affect performance.
##########
src/types/redis_hash.cc:
##########
@@ -428,4 +516,200 @@ rocksdb::Status Hash::RandField(engine::Context &ctx,
const Slice &user_key, int
return rocksdb::Status::OK();
}
+rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice
&user_key, uint64_t expire_ms,
+ const std::vector<Slice> &fields,
std::vector<FieldExpireResult> *results) {
+ results->clear();
+ results->reserve(fields.size());
+
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ HashMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) {
+ // Key doesn't exist - all fields don't exist
+ results->assign(fields.size(), FieldExpireResult::kFieldNotFound);
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisHash);
+ s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ for (const auto &field : fields) {
+ std::string sub_key = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string raw_value;
+ s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &raw_value);
+
+ if (s.IsNotFound()) {
+ results->push_back(FieldExpireResult::kFieldNotFound); // Field doesn't
exist
+ continue;
+ }
+ if (!s.ok()) return s;
+
+ // Decode existing value
+ HashFieldValue field_value;
+ if (!HashFieldValue::Decode(raw_value, &field_value)) {
+ return rocksdb::Status::Corruption("failed to decode hash field value");
+ }
+
+ // Check if field is already expired
+ if (field_value.IsExpired()) {
+ results->push_back(FieldExpireResult::kFieldNotFound); // Treat as
non-existent
+ continue;
+ }
+
+ // Set new expiration - create a new HashFieldValue with the value and new
expiration
+ HashFieldValue new_field_value(field_value.value, expire_ms);
+ std::string encoded_value;
+ new_field_value.Encode(&encoded_value);
+ s = batch->Put(sub_key, encoded_value);
+ if (!s.ok()) return s;
+ results->push_back(FieldExpireResult::kExpireSet); // Expiration set
successfully
+ }
+
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status Hash::TTLFields(engine::Context &ctx, const Slice &user_key,
const std::vector<Slice> &fields,
+ std::vector<int64_t> *results) {
+ results->clear();
+ results->reserve(fields.size());
+
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ HashMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) {
+ // Key doesn't exist - all fields don't exist
+ results->assign(fields.size(), -2);
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ for (const auto &field : fields) {
+ std::string sub_key = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string raw_value;
+ s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &raw_value);
+
+ if (s.IsNotFound()) {
+ results->push_back(-2); // Field doesn't exist
+ continue;
+ }
+ if (!s.ok()) return s;
+
+ // Decode existing value
+ HashFieldValue field_value;
+ if (!HashFieldValue::Decode(raw_value, &field_value)) {
+ return rocksdb::Status::Corruption("failed to decode hash field value");
+ }
+
+ // Check if field is already expired
+ if (field_value.IsExpired()) {
+ results->push_back(-2); // Treat as non-existent
+ continue;
+ }
+
+ // Return TTL
+ results->push_back(field_value.TTL());
+ }
+
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice
&user_key, const std::vector<Slice> &fields,
+ std::vector<FieldPersistResult> *results) {
+ results->clear();
+ results->reserve(fields.size());
+
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ HashMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) {
+ // Key doesn't exist - all fields don't exist
+ results->assign(fields.size(), FieldPersistResult::kFieldNotFound);
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisHash);
+ s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ bool has_updates = false;
+ for (const auto &field : fields) {
+ std::string sub_key = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string raw_value;
+ s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &raw_value);
+
+ if (s.IsNotFound()) {
+ results->push_back(FieldPersistResult::kFieldNotFound); // Field
doesn't exist
+ continue;
+ }
+ if (!s.ok()) return s;
+
+ // Decode existing value
+ HashFieldValue field_value;
+ if (!HashFieldValue::Decode(raw_value, &field_value)) {
+ return rocksdb::Status::Corruption("failed to decode hash field value");
+ }
+
+ // Check if field is already expired
+ if (field_value.IsExpired()) {
+ results->push_back(FieldPersistResult::kFieldNotFound); // Treat as
non-existent
+ continue;
+ }
+
+ // Check if field has expiration
+ if (field_value.expire == 0) {
+ results->push_back(FieldPersistResult::kNotVolatile); // Field exists
but has no TTL
+ continue;
+ }
+
+ // Remove expiration - create a new HashFieldValue with the value and no
expiration
+ HashFieldValue new_field_value(field_value.value, 0);
+ std::string encoded_value;
+ new_field_value.Encode(&encoded_value);
+ s = batch->Put(sub_key, encoded_value);
+ if (!s.ok()) return s;
+ has_updates = true;
+ results->push_back(FieldPersistResult::kPersisted); // Expiration removed
successfully
+ }
+
+ if (has_updates) {
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+ }
+ return rocksdb::Status::OK();
+}
+
+void Hash::asyncRepairHash(const std::string &ns_key, const Slice &field,
const HashMetadata &metadata) const {
Review Comment:
It's not a good idea to spawn a thread per expired hash field. Perhaps we
can reuse the existing task runner, or have a fixed thread pool for this
purpose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]