This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new adcc62b feat(split): gc useless data after partition split (#698)
adcc62b is described below
commit adcc62bf2db258279d19b69b58617aefd2b0a78d
Author: HeYuchen <[email protected]>
AuthorDate: Tue Mar 23 10:00:57 2021 +0800
feat(split): gc useless data after partition split (#698)
---
rdsn | 2 +-
src/base/pegasus_const.cpp | 3 ++
src/base/pegasus_const.h | 2 ++
src/base/pegasus_key_schema.h | 25 ++++++++++++----
src/server/key_ttl_compaction_filter.h | 55 ++++++++++++++++++++++++++++++----
src/server/pegasus_server_impl.cpp | 26 ++++++++++++++--
src/server/pegasus_server_impl.h | 3 ++
7 files changed, 102 insertions(+), 14 deletions(-)
diff --git a/rdsn b/rdsn
index 1405483..290a811 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 140548388e62700b3f018c9f6ccaf2f0b396df35
+Subproject commit 290a8116d047d2cb2ee3d74026d796d0e4d228df
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index c80b1e6..da529b8 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -88,4 +88,7 @@ const std::string
ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold
/// time threshold of each rocksdb iteration
const std::string
ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms");
+
+/// true means compaction and scan will validate partition_hash, otherwise
false
+const std::string
SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
} // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index 4f4814d..4f131eb 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -62,4 +62,6 @@ extern const std::string PEGASUS_CLUSTER_SECTION_NAME;
extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;
extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;
+
+extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
} // namespace pegasus
diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h
index 59fecbf..b894c6d 100644
--- a/src/base/pegasus_key_schema.h
+++ b/src/base/pegasus_key_schema.h
@@ -146,22 +146,23 @@ pegasus_restore_key(const ::dsn::blob &key, std::string
&hash_key, std::string &
}
}
-// calculate hash from rocksdb key.
-inline uint64_t pegasus_key_hash(const ::dsn::blob &key)
+// calculate hash from rocksdb key or rocksdb slice
+template <typename T>
+inline uint64_t pegasus_key_hash(const T &key)
{
- dassert(key.length() >= 2, "key length must be no less than 2");
+ dassert(key.size() >= 2, "key length must be no less than 2");
// hash_key_len is in big endian
uint16_t hash_key_len = be16toh(*(int16_t *)(key.data()));
if (hash_key_len > 0) {
// hash_key_len > 0, compute hash from hash_key
- dassert(key.length() >= 2 + hash_key_len,
+ dassert(key.size() >= 2 + hash_key_len,
"key length must be no less than (2 + hash_key_len)");
- return dsn::utils::crc64_calc(key.buffer_ptr() + 2, hash_key_len, 0);
+ return dsn::utils::crc64_calc(key.data() + 2, hash_key_len, 0);
} else {
// hash_key_len == 0, compute hash from sort_key
- return dsn::utils::crc64_calc(key.buffer_ptr() + 2, key.length() - 2,
0);
+ return dsn::utils::crc64_calc(key.data() + 2, key.size() - 2, 0);
}
}
@@ -171,4 +172,16 @@ inline uint64_t pegasus_hash_key_hash(const ::dsn::blob
&hash_key)
return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0);
}
+// check key should be served this partition
+// Notice: partition_version should be check if is greater than 0 before
calling this function
+template <class T>
+inline bool check_pegasus_key_hash(const T &key, int32_t pidx, int32_t
partition_version)
+{
+ auto target_pidx = pegasus_key_hash(key) & partition_version;
+ if (dsn_unlikely(target_pidx != pidx)) {
+ return false;
+ }
+ return true;
+}
+
} // namespace pegasus
diff --git a/src/server/key_ttl_compaction_filter.h
b/src/server/key_ttl_compaction_filter.h
index f6f410d..4f26844 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -25,6 +25,7 @@
#include <rocksdb/merge_operator.h>
#include "base/pegasus_utils.h"
+#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
namespace pegasus {
@@ -33,8 +34,18 @@ namespace server {
class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
{
public:
- KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, uint32_t
default_ttl, bool enabled)
- : _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl), _enabled(enabled)
+ KeyWithTTLCompactionFilter(uint32_t pegasus_data_version,
+ uint32_t default_ttl,
+ bool enabled,
+ int32_t pidx,
+ int32_t partition_version,
+ bool validate_hash)
+ : _pegasus_data_version(pegasus_data_version),
+ _default_ttl(default_ttl),
+ _enabled(enabled),
+ _partition_index(pidx),
+ _partition_version(partition_version),
+ _validate_partition_hash(validate_hash)
{
}
@@ -58,16 +69,30 @@ public:
*value_changed = true;
return false;
}
- return check_if_ts_expired(utils::epoch_now(), expire_ts);
+ return check_if_ts_expired(utils::epoch_now(), expire_ts) ||
check_if_stale_split_data(key);
}
const char *Name() const override { return "KeyWithTTLCompactionFilter"; }
+ // Check if the record is stale after partition split, which will split
the partition into two
+ // halves. The stale record belongs to the other half.
+ bool check_if_stale_split_data(const rocksdb::Slice &key) const
+ {
+ if (!_validate_partition_hash || key.size() < 2 || _partition_version
< 0 ||
+ _partition_index > _partition_version) {
+ return false;
+ }
+ return !check_pegasus_key_hash(key, _partition_index,
_partition_version);
+ }
+
private:
uint32_t _pegasus_data_version;
uint32_t _default_ttl;
bool _enabled; // only process filtering when _enabled == true
mutable pegasus_value_generator _gen;
+ int32_t _partition_index;
+ int32_t _partition_version;
+ bool _validate_partition_hash;
};
class KeyWithTTLCompactionFilterFactory : public
rocksdb::CompactionFilterFactory
@@ -79,8 +104,13 @@ public:
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context &
/*context*/) override
{
- return std::unique_ptr<KeyWithTTLCompactionFilter>(new
KeyWithTTLCompactionFilter(
- _pegasus_data_version.load(), _default_ttl.load(),
_enabled.load()));
+ return std::unique_ptr<KeyWithTTLCompactionFilter>(
+ new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
+ _default_ttl.load(),
+ _enabled.load(),
+ _partition_index.load(),
+ _partition_version.load(),
+ _validate_partition_hash.load()));
}
const char *Name() const override { return
"KeyWithTTLCompactionFilterFactory"; }
@@ -90,11 +120,26 @@ public:
}
void EnableFilter() { _enabled.store(true, std::memory_order_release); }
void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl,
std::memory_order_release); }
+ void SetValidatePartitionHash(bool validate_hash)
+ {
+ _validate_partition_hash.store(validate_hash,
std::memory_order_release);
+ }
+ void SetPartitionIndex(int32_t pidx)
+ {
+ _partition_index.store(pidx, std::memory_order_release);
+ }
+ void SetPartitionVersion(int32_t partition_version)
+ {
+ _partition_version.store(partition_version, std::memory_order_release);
+ }
private:
std::atomic<uint32_t> _pegasus_data_version;
std::atomic<uint32_t> _default_ttl;
std::atomic_bool _enabled; // only process filtering when _enabled == true
+ std::atomic<int32_t> _partition_index{0};
+ std::atomic<int32_t> _partition_version{-1};
+ std::atomic_bool _validate_partition_hash{false};
};
} // namespace server
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 38b15d7..7816bcd 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1458,6 +1458,8 @@ void pegasus_server_impl::on_clear_scanner(const int64_t
&args) { _context_cache
// only enable filter after correct pegasus_data_version set
_key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
+
_key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
+
_key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index()
- 1);
_key_ttl_compaction_filter_factory->EnableFilter();
parse_checkpoints();
@@ -2294,6 +2296,7 @@ void pegasus_server_impl::update_app_envs(const
std::map<std::string, std::strin
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
+ update_validate_partition_hash(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}
@@ -2310,6 +2313,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_checkpoint_reserve(envs);
update_slow_query_threshold(envs);
update_rocksdb_iteration_threshold(envs);
+ update_validate_partition_hash(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
}
@@ -2438,6 +2442,25 @@ void
pegasus_server_impl::update_rocksdb_iteration_threshold(
}
}
+void pegasus_server_impl::update_validate_partition_hash(
+ const std::map<std::string, std::string> &envs)
+{
+ bool new_value = false;
+ auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH);
+ if (iter != envs.end()) {
+ if (!dsn::buf2bool(iter->second, new_value)) {
+ derror_replica("{}={} is invalid.", iter->first, iter->second);
+ return;
+ }
+ }
+ if (new_value != _validate_partition_hash) {
+ ddebug_replica(
+ "update '_validate_partition_hash' from {} to {}",
_validate_partition_hash, new_value);
+ _validate_partition_hash = new_value;
+
_key_ttl_compaction_filter_factory->SetValidatePartitionHash(_validate_partition_hash);
+ }
+}
+
bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType>
&compression_per_level)
{
@@ -2750,8 +2773,7 @@ void pegasus_server_impl::set_partition_version(int32_t
partition_version)
int32_t old_partition_version =
_partition_version.exchange(partition_version);
ddebug_replica(
"update partition version from {} to {}", old_partition_version,
partition_version);
-
- // TODO(heyuchen): set filter _partition_version in further pr
+ _key_ttl_compaction_filter_factory->SetPartitionVersion(partition_version);
}
::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 5785624..2b314c3 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -262,6 +262,8 @@ private:
void update_rocksdb_iteration_threshold(const std::map<std::string,
std::string> &envs);
+ void update_validate_partition_hash(const std::map<std::string,
std::string> &envs);
+
// return true if parse compression types 'config' success, otherwise
return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
@@ -402,6 +404,7 @@ private:
pegasus_manual_compact_service _manual_compact_svc;
std::atomic<int32_t> _partition_version;
+ bool _validate_partition_hash{false};
dsn::replication::ingestion_status::type _ingestion_status{
dsn::replication::ingestion_status::IS_INVALID};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]