This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 31644c1 feat: add update ttl compaction operation (#760)
31644c1 is described below
commit 31644c15c50d74c60b6ea017f668a6c1b5c62f35
Author: zhao liwei <[email protected]>
AuthorDate: Wed Jun 16 16:24:40 2021 +0800
feat: add update ttl compaction operation (#760)
---
src/server/compaction_filter_rule.h | 4 +
src/server/compaction_operation.cpp | 47 +++++++
src/server/compaction_operation.h | 31 +++++
src/server/test/compaction_operation_test.cpp | 171 ++++++++++++++++++++++++++
4 files changed, 253 insertions(+)
diff --git a/src/server/compaction_filter_rule.h
b/src/server/compaction_filter_rule.h
index 8927839..bc61039 100644
--- a/src/server/compaction_filter_rule.h
+++ b/src/server/compaction_filter_rule.h
@@ -67,6 +67,8 @@ private:
FRIEND_TEST(hashkey_pattern_rule_test, match);
FRIEND_TEST(delete_key_test, filter);
+ FRIEND_TEST(update_ttl_test, filter);
+ FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
};
class sortkey_pattern_rule : public compaction_filter_rule
@@ -83,6 +85,7 @@ private:
string_match_type match_type;
FRIEND_TEST(sortkey_pattern_rule_test, match);
+ FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
};
class ttl_range_rule : public compaction_filter_rule
@@ -101,6 +104,7 @@ private:
uint32_t pegasus_data_version;
FRIEND_TEST(ttl_range_rule_test, match);
+ FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
};
} // namespace server
} // namespace pegasus
diff --git a/src/server/compaction_operation.cpp
b/src/server/compaction_operation.cpp
index bb86e0c..895c75d 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -17,6 +17,8 @@
* under the License.
*/
+#include "base/pegasus_utils.h"
+#include "base/pegasus_value_schema.h"
#include "compaction_operation.h"
namespace pegasus {
@@ -55,5 +57,50 @@ bool delete_key::filter(const std::string &hash_key,
}
return true;
}
+
+update_ttl::update_ttl(filter_rules &&rules, uint32_t pegasus_data_version)
+ : compaction_operation(std::move(rules), pegasus_data_version)
+{
+}
+
+bool update_ttl::filter(const std::string &hash_key,
+ const std::string &sort_key,
+ const rocksdb::Slice &existing_value,
+ std::string *new_value,
+ bool *value_changed) const
+{
+ if (!all_rules_match(hash_key, sort_key, existing_value)) {
+ return false;
+ }
+
+ uint32_t new_ts = 0;
+ switch (type) {
+ case update_ttl_op_type::UTOT_FROM_NOW:
+ new_ts = utils::epoch_now() + value;
+ break;
+ case update_ttl_op_type::UTOT_FROM_CURRENT: {
+ auto ttl =
+ pegasus_extract_expire_ts(pegasus_data_version,
utils::to_string_view(existing_value));
+ if (ttl == 0) {
+ return false;
+ }
+ new_ts = value + ttl;
+ break;
+ }
+ case update_ttl_op_type::UTOT_TIMESTAMP:
+ // make it's seconds since 2016.01.01-00:00:00 GMT
+ new_ts = value - pegasus::utils::epoch_begin;
+ break;
+ default:
+ ddebug("invalid update ttl operation type");
+ return false;
+ }
+
+ *new_value = existing_value.ToString();
+ pegasus_update_expire_ts(pegasus_data_version, *new_value, new_ts);
+ *value_changed = true;
+ return false;
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/compaction_operation.h
b/src/server/compaction_operation.h
index a4e6305..d4e4285 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -70,6 +70,37 @@ public:
private:
FRIEND_TEST(delete_key_test, filter);
+ FRIEND_TEST(compaction_filter_operation_test, all_rules_match);
+};
+
+enum update_ttl_op_type
+{
+ // update ttl to epoch_now() + value
+ UTOT_FROM_NOW,
+ // update ttl to {current ttl in rocksdb value} + value
+ UTOT_FROM_CURRENT,
+ // update ttl to value - time(nullptr), which means this key will expire
at the
+ // timestamp of {value}
+ UTOT_TIMESTAMP,
+ UTOT_INVALID,
+};
+
+class update_ttl : public compaction_operation
+{
+public:
+ update_ttl(filter_rules &&rules, uint32_t pegasus_data_version);
+
+ bool filter(const std::string &hash_key,
+ const std::string &sort_key,
+ const rocksdb::Slice &existing_value,
+ std::string *new_value,
+ bool *value_changed) const;
+
+private:
+ update_ttl_op_type type;
+ uint32_t value;
+
+ FRIEND_TEST(update_ttl_test, filter);
};
} // namespace server
} // namespace pegasus
diff --git a/src/server/test/compaction_operation_test.cpp
b/src/server/test/compaction_operation_test.cpp
index b7140aa..9303872 100644
--- a/src/server/test/compaction_operation_test.cpp
+++ b/src/server/test/compaction_operation_test.cpp
@@ -20,11 +20,104 @@
#include <gtest/gtest.h>
#include "server/compaction_operation.h"
#include "server/compaction_filter_rule.h"
+#include "base/pegasus_value_schema.h"
+#include "base/pegasus_utils.h"
#include <dsn/utility/smart_pointers.h>
namespace pegasus {
namespace server {
+TEST(compaction_filter_operation_test, all_rules_match)
+{
+ struct test_case
+ {
+ bool all_match;
+ std::string hashkey;
+ std::string sortkey;
+ int32_t expire_ttl;
+ // hashkey_rule
+ std::string hashkey_pattern;
+ string_match_type hashkey_match_type;
+ // sortkey_rule
+ std::string sortkey_pattern;
+ string_match_type sortkey_match_type;
+ // ttl_range_rule
+ int32_t start_ttl;
+ int32_t stop_ttl;
+ } tests[] = {
+ {true,
+ "hashkey",
+ "sortkey",
+ 1000,
+ "hashkey",
+ SMT_MATCH_ANYWHERE,
+ "sortkey",
+ SMT_MATCH_ANYWHERE,
+ 100,
+ 2000},
+ {false,
+ "hash_key",
+ "sortkey",
+ 1000,
+ "hashkey",
+ SMT_MATCH_ANYWHERE,
+ "sortkey",
+ SMT_MATCH_ANYWHERE,
+ 100,
+ 2000},
+ {false,
+ "hashkey",
+ "sort_key",
+ 1000,
+ "hashkey",
+ SMT_MATCH_ANYWHERE,
+ "sortkey",
+ SMT_MATCH_ANYWHERE,
+ 100,
+ 2000},
+ {false,
+ "hashkey",
+ "sortkey",
+ 10000,
+ "hashkey",
+ SMT_MATCH_ANYWHERE,
+ "sortkey",
+ SMT_MATCH_ANYWHERE,
+ 100,
+ 2000},
+ };
+
+ uint32_t data_version = 1;
+ filter_rules rules;
+ rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
+ rules.push_back(dsn::make_unique<sortkey_pattern_rule>());
+ rules.push_back(dsn::make_unique<ttl_range_rule>(data_version));
+ delete_key update_operation(std::move(rules), data_version);
+ pegasus_value_generator gen;
+ auto now_ts = utils::epoch_now();
+ for (const auto &test : tests) {
+ auto hash_rule = static_cast<hashkey_pattern_rule
*>(update_operation.rules[0].get());
+ auto sort_rule = static_cast<sortkey_pattern_rule
*>(update_operation.rules[1].get());
+ auto ttl_rule = static_cast<ttl_range_rule
*>(update_operation.rules[2].get());
+
+ hash_rule->pattern = test.hashkey_pattern;
+ hash_rule->match_type = test.hashkey_match_type;
+ sort_rule->pattern = test.sortkey_pattern;
+ sort_rule->match_type = test.sortkey_match_type;
+ ttl_rule->start_ttl = test.start_ttl;
+ ttl_rule->stop_ttl = test.stop_ttl;
+
+ rocksdb::SliceParts svalue =
+ gen.generate_value(data_version, "", test.expire_ttl + now_ts, 0);
+ ASSERT_EQ(update_operation.all_rules_match(test.hashkey, test.sortkey,
svalue.parts[0]),
+ test.all_match);
+ }
+
+ // all_rules_match will return false if there is no rule in this operation
+ update_ttl no_rule_operation({}, data_version);
+ ASSERT_EQ(no_rule_operation.all_rules_match("hash", "sort",
rocksdb::Slice()), false);
+}
+
TEST(delete_key_test, filter)
{
struct test_case
@@ -51,5 +144,83 @@ TEST(delete_key_test, filter)
delete_operation.filter(test.hashkey, "", rocksdb::Slice(),
nullptr, nullptr));
}
}
+
+TEST(update_ttl_test, filter)
+{
+ struct test_case
+ {
+ bool value_changed;
+ uint32_t expect_ts;
+ std::string hashkey;
+ uint32_t expire_ts;
+ // hashkey_rule
+ std::string hashkey_pattern;
+ string_match_type hashkey_match_type;
+ // operation
+ update_ttl_op_type op_type;
+ uint32_t value;
+ } tests[] = {
+ {true, 1000, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE,
UTOT_FROM_NOW, 1000},
+ {false, 0, "hashkey", 0, "hashkey", SMT_MATCH_ANYWHERE,
UTOT_FROM_CURRENT, 1000},
+ {true, 1300, "hashkey", 300, "hashkey", SMT_MATCH_ANYWHERE,
UTOT_FROM_CURRENT, 1000},
+ {true,
+ 1000 + pegasus::utils::epoch_begin,
+ "hashkey",
+ 300,
+ "hashkey",
+ SMT_MATCH_ANYWHERE,
+ UTOT_TIMESTAMP,
+ 1000 + pegasus::utils::epoch_begin},
+ {false,
+ 1000 + pegasus::utils::epoch_begin,
+ "hashkey",
+ 300,
+ "hashkey111",
+ SMT_MATCH_ANYWHERE,
+ UTOT_TIMESTAMP,
+ 1000 + pegasus::utils::epoch_begin},
+ };
+
+ uint32_t data_version = 1;
+ filter_rules rules;
+ rules.push_back(dsn::make_unique<hashkey_pattern_rule>());
+ update_ttl update_operation(std::move(rules), data_version);
+ pegasus_value_generator gen;
+ for (const auto &test : tests) {
+ auto hash_rule = static_cast<hashkey_pattern_rule
*>(update_operation.rules.begin()->get());
+ hash_rule->pattern = test.hashkey_pattern;
+ hash_rule->match_type = test.hashkey_match_type;
+ update_operation.value = test.value;
+ update_operation.type = test.op_type;
+
+ std::string new_value;
+ bool value_changed = false;
+ rocksdb::SliceParts svalue = gen.generate_value(data_version, "",
test.expire_ts, 0);
+ uint32_t before_ts = utils::epoch_now();
+ ASSERT_EQ(
+ false,
+ update_operation.filter(test.hashkey, "", svalue.parts[0],
&new_value, &value_changed));
+ ASSERT_EQ(test.value_changed, value_changed);
+ if (value_changed) {
+ uint32_t new_ts = pegasus_extract_expire_ts(data_version,
new_value);
+ switch (test.op_type) {
+ case UTOT_TIMESTAMP:
+ ASSERT_EQ(new_ts + pegasus::utils::epoch_begin,
test.expect_ts);
+ break;
+ case UTOT_FROM_CURRENT:
+ ASSERT_EQ(new_ts, test.expect_ts);
+ break;
+ case UTOT_FROM_NOW: {
+ uint32_t after_ts = utils::epoch_now();
+ ASSERT_GE(new_ts, test.expect_ts + before_ts);
+ ASSERT_LE(new_ts, test.expect_ts + after_ts);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ }
+}
} // namespace server
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]