This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new ea1fdc7  feat: start compaction operation in CompactionFilter::Filter 
(#780)
ea1fdc7 is described below

commit ea1fdc7a8731dadfd336ef41009e4b4e3fd58f81
Author: zhao liwei <[email protected]>
AuthorDate: Tue Jul 20 12:07:45 2021 +0800

    feat: start compaction operation in CompactionFilter::Filter (#780)
---
 src/server/compaction_operation.cpp    |  2 +-
 src/server/compaction_operation.h      |  2 +-
 src/server/key_ttl_compaction_filter.h | 43 +++++++++++++++++++++++++++++++---
 3 files changed, 42 insertions(+), 5 deletions(-)

diff --git a/src/server/compaction_operation.cpp 
b/src/server/compaction_operation.cpp
index 86b12c0..4b81040 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -174,7 +174,7 @@ compaction_operations create_compaction_operations(const 
std::string &json, uint
             enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, 
data_version);
         if (operation != nullptr) {
             operation->set_rules(std::move(rules));
-            res.emplace_back(std::unique_ptr<compaction_operation>(operation));
+            res.emplace_back(std::shared_ptr<compaction_operation>(operation));
         }
     }
     return res;
diff --git a/src/server/compaction_operation.h 
b/src/server/compaction_operation.h
index f1cdf93..e20b3b4 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -159,7 +159,7 @@ private:
     FRIEND_TEST(compaction_filter_operation_test, create_operations);
 };
 
-typedef std::vector<std::unique_ptr<compaction_operation>> 
compaction_operations;
+typedef std::vector<std::shared_ptr<compaction_operation>> 
compaction_operations;
 compaction_operations create_compaction_operations(const std::string &json, 
uint32_t data_version);
 void register_compaction_operations();
 } // namespace server
diff --git a/src/server/key_ttl_compaction_filter.h 
b/src/server/key_ttl_compaction_filter.h
index c049e83..e3f94a3 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -40,13 +40,15 @@ public:
                                bool enabled,
                                int32_t pidx,
                                int32_t partition_version,
-                               bool validate_hash)
+                               bool validate_hash,
+                               compaction_operations &&compaction_ops)
         : _pegasus_data_version(pegasus_data_version),
           _default_ttl(default_ttl),
           _enabled(enabled),
           _partition_index(pidx),
           _partition_version(partition_version),
-          _validate_partition_hash(validate_hash)
+          _validate_partition_hash(validate_hash),
+          _user_specified_operations(std::move(compaction_ops))
     {
     }
 
@@ -60,6 +62,17 @@ public:
             return false;
         }
 
+        // ignore empty write. Empty writes will deleted by the compaction of 
rocksdb. We don't need
+        // deal with it here.
+        if (key.size() < 2) {
+            return false;
+        }
+
+        if (!_user_specified_operations.empty() &&
+            user_specified_operation_filter(key, existing_value, new_value, 
value_changed)) {
+            return true;
+        }
+
         uint32_t expire_ts =
             pegasus_extract_expire_ts(_pegasus_data_version, 
utils::to_string_view(existing_value));
         if (_default_ttl != 0 && expire_ts == 0) {
@@ -73,6 +86,22 @@ public:
         return check_if_ts_expired(utils::epoch_now(), expire_ts) || 
check_if_stale_split_data(key);
     }
 
+    bool user_specified_operation_filter(const rocksdb::Slice &key,
+                                         const rocksdb::Slice &existing_value,
+                                         std::string *new_value,
+                                         bool *value_changed) const
+    {
+        std::string hash_key, sort_key;
+        pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, 
sort_key);
+        for (const auto &op : _user_specified_operations) {
+            if (op->filter(hash_key, sort_key, existing_value, new_value, 
value_changed)) {
+                // return true if this data need to be deleted
+                return true;
+            }
+        }
+        return false;
+    }
+
     const char *Name() const override { return "KeyWithTTLCompactionFilter"; }
 
     // Check if the record is stale after partition split, which will split 
the partition into two
@@ -94,6 +123,7 @@ private:
     int32_t _partition_index;
     int32_t _partition_version;
     bool _validate_partition_hash;
+    compaction_operations _user_specified_operations;
 };
 
 class KeyWithTTLCompactionFilterFactory : public 
rocksdb::CompactionFilterFactory
@@ -105,13 +135,20 @@ public:
     std::unique_ptr<rocksdb::CompactionFilter>
     CreateCompactionFilter(const rocksdb::CompactionFilter::Context & 
/*context*/) override
     {
+        compaction_operations tmp_filter_operations;
+        {
+            dsn::utils::auto_read_lock l(_lock);
+            tmp_filter_operations = _user_specified_operations;
+        }
+
         return std::unique_ptr<KeyWithTTLCompactionFilter>(
             new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
                                            _default_ttl.load(),
                                            _enabled.load(),
                                            _partition_index.load(),
                                            _partition_version.load(),
-                                           _validate_partition_hash.load()));
+                                           _validate_partition_hash.load(),
+                                           std::move(tmp_filter_operations)));
     }
     const char *Name() const override { return 
"KeyWithTTLCompactionFilterFactory"; }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to