This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 31644c1  feat: add update ttl compaction operation (#760)
31644c1 is described below

commit 31644c15c50d74c60b6ea017f668a6c1b5c62f35
Author: zhao liwei <[email protected]>
AuthorDate: Wed Jun 16 16:24:40 2021 +0800

    feat: add update ttl compaction operation (#760)
---
 src/server/compaction_filter_rule.h           |   4 +
 src/server/compaction_operation.cpp           |  47 +++++++
 src/server/compaction_operation.h             |  31 +++++
 src/server/test/compaction_operation_test.cpp | 171 ++++++++++++++++++++++++++
 4 files changed, 253 insertions(+)

diff --git a/src/server/compaction_filter_rule.h 
b/src/server/compaction_filter_rule.h
index 8927839..bc61039 100644
--- a/src/server/compaction_filter_rule.h
+++ b/src/server/compaction_filter_rule.h
@@ -67,6 +67,8 @@ private:
 
     FRIEND_TEST(hashkey_pattern_rule_test, match);
     FRIEND_TEST(delete_key_test, filter);
+    FRIEND_TEST(update_ttl_test, filter);
+    FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
 };
 
 class sortkey_pattern_rule : public compaction_filter_rule
@@ -83,6 +85,7 @@ private:
     string_match_type match_type;
 
     FRIEND_TEST(sortkey_pattern_rule_test, match);
+    FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
 };
 
 class ttl_range_rule : public compaction_filter_rule
@@ -101,6 +104,7 @@ private:
     uint32_t pegasus_data_version;
 
     FRIEND_TEST(ttl_range_rule_test, match);
+    FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
 };
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/compaction_operation.cpp 
b/src/server/compaction_operation.cpp
index bb86e0c..895c75d 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+#include "base/pegasus_utils.h"
+#include "base/pegasus_value_schema.h"
 #include "compaction_operation.h"
 
 namespace pegasus {
@@ -55,5 +57,50 @@ bool delete_key::filter(const std::string &hash_key,
     }
     return true;
 }
+
+update_ttl::update_ttl(filter_rules &&rules, uint32_t pegasus_data_version)
+    : compaction_operation(std::move(rules), pegasus_data_version)
+{
+}
+
+bool update_ttl::filter(const std::string &hash_key,
+                        const std::string &sort_key,
+                        const rocksdb::Slice &existing_value,
+                        std::string *new_value,
+                        bool *value_changed) const
+{
+    if (!all_rules_match(hash_key, sort_key, existing_value)) {
+        return false;
+    }
+
+    uint32_t new_ts = 0;
+    switch (type) {
+    case update_ttl_op_type::UTOT_FROM_NOW:
+        new_ts = utils::epoch_now() + value;
+        break;
+    case update_ttl_op_type::UTOT_FROM_CURRENT: {
+        auto ttl =
+            pegasus_extract_expire_ts(pegasus_data_version, 
utils::to_string_view(existing_value));
+        if (ttl == 0) {
+            return false;
+        }
+        new_ts = value + ttl;
+        break;
+    }
+    case update_ttl_op_type::UTOT_TIMESTAMP:
+        // make it's seconds since 2016.01.01-00:00:00 GMT
+        new_ts = value - pegasus::utils::epoch_begin;
+        break;
+    default:
+        ddebug("invalid update ttl operation type");
+        return false;
+    }
+
+    *new_value = existing_value.ToString();
+    pegasus_update_expire_ts(pegasus_data_version, *new_value, new_ts);
+    *value_changed = true;
+    return false;
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/compaction_operation.h 
b/src/server/compaction_operation.h
index a4e6305..d4e4285 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -70,6 +70,37 @@ public:
 
 private:
     FRIEND_TEST(delete_key_test, filter);
+    FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
+};
+
+enum update_ttl_op_type
+{
+    // update ttl to epoch_now() + value
+    UTOT_FROM_NOW,
+    // update ttl to {current ttl in rocksdb value} + value
+    UTOT_FROM_CURRENT,
+    // update ttl to value - time(nullptr), which means this key will expire 
at the
+    // timestamp of {value}
+    UTOT_TIMESTAMP,
+    UTOT_INVALID,
+};
+
+class update_ttl : public compaction_operation
+{
+public:
+    update_ttl(filter_rules &&rules, uint32_t pegasus_data_version);
+
+    bool filter(const std::string &hash_key,
+                const std::string &sort_key,
+                const rocksdb::Slice &existing_value,
+                std::string *new_value,
+                bool *value_changed) const;
+
+private:
+    update_ttl_op_type type;
+    uint32_t value;
+
+    FRIEND_TEST(update_ttl_test, filter);
 };
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/test/compaction_operation_test.cpp 
b/src/server/test/compaction_operation_test.cpp
index b7140aa..9303872 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -20,11 +20,104 @@
 #include <gtest/gtest.h>
 #include "server/compaction_operation.h"
 #include "server/compaction_filter_rule.h"
+#include "base/pegasus_value_schema.h"
+#include "base/pegasus_utils.h"
 #include <dsn/utility/smart_pointers.h>
 
 namespace pegasus {
 namespace server {
 
+TEST(compaction_filter_operation_test, all_rules_match)
+{
+    struct test_case
+    {
+        bool all_match;
+        std::string hashkey;
+        std::string sortkey;
+        int32_t expire_ttl;
+        // hashkey_rule
+        std::string hashkey_pattern;
+        string_match_type hashkey_match_type;
+        // sortkey_rule
+        std::string sortkey_pattern;
+        string_match_type sortkey_match_type;
+        // ttl_range_rule
+        int32_t start_ttl;
+        int32_t stop_ttl;
+    } tests[] = {
+        {true,
+         "hashkey",
+         "sortkey",
+         1000,
+         "hashkey",
+         SMT_MATCH_ANYWHERE,
+         "sortkey",
+         SMT_MATCH_ANYWHERE,
+         100,
+         2000},
+        {false,
+         "hash_key",
+         "sortkey",
+         1000,
+         "hashkey",
+         SMT_MATCH_ANYWHERE,
+         "sortkey",
+         SMT_MATCH_ANYWHERE,
+         100,
+         2000},
+        {false,
+         "hashkey",
+         "sort_key",
+         1000,
+         "hashkey",
+         SMT_MATCH_ANYWHERE,
+         "sortkey",
+         SMT_MATCH_ANYWHERE,
+         100,
+         2000},
+        {false,
+         "hashkey",
+         "sortkey",
+         10000,
+         "hashkey",
+         SMT_MATCH_ANYWHERE,
+         "sortkey",
+         SMT_MATCH_ANYWHERE,
+         100,
+         2000},
+    };
+
+    uint32_t data_version = 1;
+    filter_rules rules;
+    rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
+    rules.push_back(dsn::make_unique<sortkey_pattern_rule>());
+    rules.push_back(dsn::make_unique<ttl_range_rule>(data_version));
+    delete_key update_operation(std::move(rules), data_version);
+    pegasus_value_generator gen;
+    auto now_ts = utils::epoch_now();
+    for (const auto &test : tests) {
+        auto hash_rule = static_cast<hashkey_pattern_rule 
*>(update_operation.rules[0].get());
+        auto sort_rule = static_cast<sortkey_pattern_rule 
*>(update_operation.rules[1].get());
+        auto ttl_rule = static_cast<ttl_range_rule 
*>(update_operation.rules[2].get());
+
+        hash_rule->pattern = test.hashkey_pattern;
+        hash_rule->match_type = test.hashkey_match_type;
+        sort_rule->pattern = test.sortkey_pattern;
+        sort_rule->match_type = test.sortkey_match_type;
+        ttl_rule->start_ttl = test.start_ttl;
+        ttl_rule->stop_ttl = test.stop_ttl;
+
+        rocksdb::SliceParts svalue =
+            gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
+        ASSERT_EQ(update_operation.all_rules_match(test.hashkey, test.sortkey, 
svalue.parts[0]),
+                  test.all_match);
+    }
+
+    // all_rules_match will return false if there is no rule in this operation
+    update_ttl no_rule_operation({}, data_version);
+    ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort", 
rocksdb::Slice()), false);
+}
+
 TEST(delete_key_test, filter)
 {
     struct test_case
@@ -51,5 +144,83 @@ TEST(delete_key_test, filter)
                   delete_operation.filter(test.hashkey, "", rocksdb::Slice(), 
nullptr, nullptr));
     }
 }
+
+TEST(update_ttl_test, filter)
+{
+    struct test_case
+    {
+        bool value_changed;
+        uint32_t expect_ts;
+        std::string hashkey;
+        uint32_t expire_ts;
+        // hashkey_rule
+        std::string hashkey_pattern;
+        string_match_type hashkey_match_type;
+        // operation
+        update_ttl_op_type op_type;
+        uint32_t value;
+    } tests[] = {
+        {true, 1000, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, 
UTOT_FROM_NOW, 1000},
+        {false, 0, "hashkey", 0, "hashkey", SMT_MATCH_ANYWHERE, 
UTOT_FROM_CURRENT, 1000},
+        {true, 1300, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE, 
UTOT_FROM_CURRENT, 1000},
+        {true,
+         1000 + pegasus::utils::epoch_begin,
+         "hashkey",
+         300,
+         "hashkey",
+         SMT_MATCH_ANYWHERE,
+         UTOT_TIMESTAMP,
+         1000 + pegasus::utils::epoch_begin},
+        {false,
+         1000 + pegasus::utils::epoch_begin,
+         "hashkey",
+         300,
+         "hashkey111",
+         SMT_MATCH_ANYWHERE,
+         UTOT_TIMESTAMP,
+         1000 + pegasus::utils::epoch_begin},
+    };
+
+    uint32_t data_version = 1;
+    filter_rules rules;
+    rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
+    update_ttl update_operation(std::move(rules), data_version);
+    pegasus_value_generator gen;
+    for (const auto &test : tests) {
+        auto hash_rule = static_cast<hashkey_pattern_rule 
*>(update_operation.rules.begin()->get());
+        hash_rule->pattern = test.hashkey_pattern;
+        hash_rule->match_type = test.hashkey_match_type;
+        update_operation.value = test.value;
+        update_operation.type = test.op_type;
+
+        std::string new_value;
+        bool value_changed = false;
+        rocksdb::SliceParts svalue = gen.generate_value(data_version, "", 
test.expire_ts, 0);
+        uint32_t before_ts = utils::epoch_now();
+        ASSERT_EQ(
+            false,
+            update_operation.filter(test.hashkey, "", svalue.parts[0], 
&new_value, &value_changed));
+        ASSERT_EQ(test.value_changed, value_changed);
+        if (value_changed) {
+            uint32_t new_ts = pegasus_extract_expire_ts(data_version, 
new_value);
+            switch (test.op_type) {
+            case UTOT_TIMESTAMP:
+                ASSERT_EQ(new_ts + pegasus::utils::epoch_begin, 
test.expect_ts);
+                break;
+            case UTOT_FROM_CURRENT:
+                ASSERT_EQ(new_ts, test.expect_ts);
+                break;
+            case UTOT_FROM_NOW: {
+                uint32_t after_ts = utils::epoch_now();
+                ASSERT_GE(new_ts, test.expect_ts + before_ts);
+                ASSERT_LE(new_ts, test.expect_ts + after_ts);
+                break;
+            }
+            default:
+                break;
+            }
+        }
+    }
+}
 } // namespace server
 } // namespace pegasus

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to