This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new adcc62b  feat(split): gc useless data after partition split (#698)
adcc62b is described below

commit adcc62bf2db258279d19b69b58617aefd2b0a78d
Author: HeYuchen <[email protected]>
AuthorDate: Tue Mar 23 10:00:57 2021 +0800

    feat(split): gc useless data after partition split (#698)
---
 rdsn                                   |  2 +-
 src/base/pegasus_const.cpp             |  3 ++
 src/base/pegasus_const.h               |  2 ++
 src/base/pegasus_key_schema.h          | 25 ++++++++++++----
 src/server/key_ttl_compaction_filter.h | 55 ++++++++++++++++++++++++++++++----
 src/server/pegasus_server_impl.cpp     | 26 ++++++++++++++--
 src/server/pegasus_server_impl.h       |  3 ++
 7 files changed, 102 insertions(+), 14 deletions(-)

diff --git a/rdsn b/rdsn
index 1405483..290a811 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 140548388e62700b3f018c9f6ccaf2f0b396df35
+Subproject commit 290a8116d047d2cb2ee3d74026d796d0e4d228df
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index c80b1e6..da529b8 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -88,4 +88,7 @@ const std::string 
ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold
 /// time threshold of each rocksdb iteration
 const std::string
     
ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms");
+
+/// true means compaction and scan will validate partition_hash, otherwise 
false
+const std::string 
SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
 } // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index 4f4814d..4f131eb 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -62,4 +62,6 @@ extern const std::string PEGASUS_CLUSTER_SECTION_NAME;
 extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;
 
 extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;
+
+extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
 } // namespace pegasus
diff --git a/src/base/pegasus_key_schema.h b/src/base/pegasus_key_schema.h
index 59fecbf..b894c6d 100644
--- a/src/base/pegasus_key_schema.h
+++ b/src/base/pegasus_key_schema.h
@@ -146,22 +146,23 @@ pegasus_restore_key(const ::dsn::blob &key, std::string 
&hash_key, std::string &
     }
 }
 
-// calculate hash from rocksdb key.
-inline uint64_t pegasus_key_hash(const ::dsn::blob &key)
+// calculate hash from rocksdb key or rocksdb slice
+template <typename T>
+inline uint64_t pegasus_key_hash(const T &key)
 {
-    dassert(key.length() >= 2, "key length must be no less than 2");
+    dassert(key.size() >= 2, "key length must be no less than 2");
 
     // hash_key_len is in big endian
     uint16_t hash_key_len = be16toh(*(int16_t *)(key.data()));
 
     if (hash_key_len > 0) {
         // hash_key_len > 0, compute hash from hash_key
-        dassert(key.length() >= 2 + hash_key_len,
+        dassert(key.size() >= 2 + hash_key_len,
                 "key length must be no less than (2 + hash_key_len)");
-        return dsn::utils::crc64_calc(key.buffer_ptr() + 2, hash_key_len, 0);
+        return dsn::utils::crc64_calc(key.data() + 2, hash_key_len, 0);
     } else {
         // hash_key_len == 0, compute hash from sort_key
-        return dsn::utils::crc64_calc(key.buffer_ptr() + 2, key.length() - 2, 
0);
+        return dsn::utils::crc64_calc(key.data() + 2, key.size() - 2, 0);
     }
 }
 
@@ -171,4 +172,16 @@ inline uint64_t pegasus_hash_key_hash(const ::dsn::blob 
&hash_key)
     return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0);
 }
 
+// check key should be served this partition
+// Notice: partition_version should be check if is greater than 0 before 
calling this function
+template <class T>
+inline bool check_pegasus_key_hash(const T &key, int32_t pidx, int32_t 
partition_version)
+{
+    auto target_pidx = pegasus_key_hash(key) & partition_version;
+    if (dsn_unlikely(target_pidx != pidx)) {
+        return false;
+    }
+    return true;
+}
+
 } // namespace pegasus
diff --git a/src/server/key_ttl_compaction_filter.h 
b/src/server/key_ttl_compaction_filter.h
index f6f410d..4f26844 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -25,6 +25,7 @@
 #include <rocksdb/merge_operator.h>
 
 #include "base/pegasus_utils.h"
+#include "base/pegasus_key_schema.h"
 #include "base/pegasus_value_schema.h"
 
 namespace pegasus {
@@ -33,8 +34,18 @@ namespace server {
 class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
 {
 public:
-    KeyWithTTLCompactionFilter(uint32_t pegasus_data_version, uint32_t 
default_ttl, bool enabled)
-        : _pegasus_data_version(pegasus_data_version), 
_default_ttl(default_ttl), _enabled(enabled)
+    KeyWithTTLCompactionFilter(uint32_t pegasus_data_version,
+                               uint32_t default_ttl,
+                               bool enabled,
+                               int32_t pidx,
+                               int32_t partition_version,
+                               bool validate_hash)
+        : _pegasus_data_version(pegasus_data_version),
+          _default_ttl(default_ttl),
+          _enabled(enabled),
+          _partition_index(pidx),
+          _partition_version(partition_version),
+          _validate_partition_hash(validate_hash)
     {
     }
 
@@ -58,16 +69,30 @@ public:
             *value_changed = true;
             return false;
         }
-        return check_if_ts_expired(utils::epoch_now(), expire_ts);
+        return check_if_ts_expired(utils::epoch_now(), expire_ts) || 
check_if_stale_split_data(key);
     }
 
     const char *Name() const override { return "KeyWithTTLCompactionFilter"; }
 
+    // Check if the record is stale after partition split, which will split 
the partition into two
+    // halves. The stale record belongs to the other half.
+    bool check_if_stale_split_data(const rocksdb::Slice &key) const
+    {
+        if (!_validate_partition_hash || key.size() < 2 || _partition_version 
< 0 ||
+            _partition_index > _partition_version) {
+            return false;
+        }
+        return !check_pegasus_key_hash(key, _partition_index, 
_partition_version);
+    }
+
 private:
     uint32_t _pegasus_data_version;
     uint32_t _default_ttl;
     bool _enabled; // only process filtering when _enabled == true
     mutable pegasus_value_generator _gen;
+    int32_t _partition_index;
+    int32_t _partition_version;
+    bool _validate_partition_hash;
 };
 
 class KeyWithTTLCompactionFilterFactory : public 
rocksdb::CompactionFilterFactory
@@ -79,8 +104,13 @@ public:
     std::unique_ptr<rocksdb::CompactionFilter>
     CreateCompactionFilter(const rocksdb::CompactionFilter::Context & 
/*context*/) override
     {
-        return std::unique_ptr<KeyWithTTLCompactionFilter>(new 
KeyWithTTLCompactionFilter(
-            _pegasus_data_version.load(), _default_ttl.load(), 
_enabled.load()));
+        return std::unique_ptr<KeyWithTTLCompactionFilter>(
+            new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
+                                           _default_ttl.load(),
+                                           _enabled.load(),
+                                           _partition_index.load(),
+                                           _partition_version.load(),
+                                           _validate_partition_hash.load()));
     }
     const char *Name() const override { return 
"KeyWithTTLCompactionFilterFactory"; }
 
@@ -90,11 +120,26 @@ public:
     }
     void EnableFilter() { _enabled.store(true, std::memory_order_release); }
     void SetDefaultTTL(uint32_t ttl) { _default_ttl.store(ttl, 
std::memory_order_release); }
+    void SetValidatePartitionHash(bool validate_hash)
+    {
+        _validate_partition_hash.store(validate_hash, 
std::memory_order_release);
+    }
+    void SetPartitionIndex(int32_t pidx)
+    {
+        _partition_index.store(pidx, std::memory_order_release);
+    }
+    void SetPartitionVersion(int32_t partition_version)
+    {
+        _partition_version.store(partition_version, std::memory_order_release);
+    }
 
 private:
     std::atomic<uint32_t> _pegasus_data_version;
     std::atomic<uint32_t> _default_ttl;
     std::atomic_bool _enabled; // only process filtering when _enabled == true
+    std::atomic<int32_t> _partition_index{0};
+    std::atomic<int32_t> _partition_version{-1};
+    std::atomic_bool _validate_partition_hash{false};
 };
 
 } // namespace server
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index 38b15d7..7816bcd 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1458,6 +1458,8 @@ void pegasus_server_impl::on_clear_scanner(const int64_t 
&args) { _context_cache
 
     // only enable filter after correct pegasus_data_version set
     
_key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
+    
_key_ttl_compaction_filter_factory->SetPartitionIndex(_gpid.get_partition_index());
+    
_key_ttl_compaction_filter_factory->SetPartitionVersion(_gpid.get_partition_index()
 - 1);
     _key_ttl_compaction_filter_factory->EnableFilter();
 
     parse_checkpoints();
@@ -2294,6 +2296,7 @@ void pegasus_server_impl::update_app_envs(const 
std::map<std::string, std::strin
     update_checkpoint_reserve(envs);
     update_slow_query_threshold(envs);
     update_rocksdb_iteration_threshold(envs);
+    update_validate_partition_hash(envs);
     _manual_compact_svc.start_manual_compact_if_needed(envs);
 }
 
@@ -2310,6 +2313,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
     update_checkpoint_reserve(envs);
     update_slow_query_threshold(envs);
     update_rocksdb_iteration_threshold(envs);
+    update_validate_partition_hash(envs);
     _manual_compact_svc.start_manual_compact_if_needed(envs);
 }
 
@@ -2438,6 +2442,25 @@ void 
pegasus_server_impl::update_rocksdb_iteration_threshold(
     }
 }
 
+void pegasus_server_impl::update_validate_partition_hash(
+    const std::map<std::string, std::string> &envs)
+{
+    bool new_value = false;
+    auto iter = envs.find(SPLIT_VALIDATE_PARTITION_HASH);
+    if (iter != envs.end()) {
+        if (!dsn::buf2bool(iter->second, new_value)) {
+            derror_replica("{}={} is invalid.", iter->first, iter->second);
+            return;
+        }
+    }
+    if (new_value != _validate_partition_hash) {
+        ddebug_replica(
+            "update '_validate_partition_hash' from {} to {}", 
_validate_partition_hash, new_value);
+        _validate_partition_hash = new_value;
+        
_key_ttl_compaction_filter_factory->SetValidatePartitionHash(_validate_partition_hash);
+    }
+}
+
 bool pegasus_server_impl::parse_compression_types(
     const std::string &config, std::vector<rocksdb::CompressionType> 
&compression_per_level)
 {
@@ -2750,8 +2773,7 @@ void pegasus_server_impl::set_partition_version(int32_t 
partition_version)
     int32_t old_partition_version = 
_partition_version.exchange(partition_version);
     ddebug_replica(
         "update partition version from {} to {}", old_partition_version, 
partition_version);
-
-    // TODO(heyuchen): set filter _partition_version in further pr
+    _key_ttl_compaction_filter_factory->SetPartitionVersion(partition_version);
 }
 
 ::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 5785624..2b314c3 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -262,6 +262,8 @@ private:
 
     void update_rocksdb_iteration_threshold(const std::map<std::string, 
std::string> &envs);
 
+    void update_validate_partition_hash(const std::map<std::string, 
std::string> &envs);
+
     // return true if parse compression types 'config' success, otherwise 
return false.
     // 'compression_per_level' will not be changed if parse failed.
     bool parse_compression_types(const std::string &config,
@@ -402,6 +404,7 @@ private:
     pegasus_manual_compact_service _manual_compact_svc;
 
     std::atomic<int32_t> _partition_version;
+    bool _validate_partition_hash{false};
 
     dsn::replication::ingestion_status::type _ingestion_status{
         dsn::replication::ingestion_status::IS_INVALID};

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to