This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e637541f5 fix(zookeeper): fix potential dangling path pointer issue 
when calling ZooKeeper APIs (#2302)
e637541f5 is described below

commit e637541f542470fb45c7f386d66a8b5c0fb9b25a
Author: Dan Wang <[email protected]>
AuthorDate: Tue Oct 28 11:12:28 2025 +0800

    fix(zookeeper): fix potential dangling path pointer issue when calling 
ZooKeeper APIs (#2302)
    
    Fix https://github.com/apache/incubator-pegasus/issues/2307.
    
    The heap-use-after-free error found in the ASan test was caused by a 
dangling
    pointer issue: the `path` pointer was freed in the thread for completion 
callback
    while it was still used in the ZooKeeper C client API.
    
    To fix this problem, `std::shared_ptr` is introduced to manage the `path`, 
ensuring
    that the memory is automatically released only when all references to it 
have been
    destroyed.
---
 src/meta/meta_backup_service.cpp                   |  13 +-
 src/meta/meta_bulk_load_service.cpp                |   7 +-
 src/meta/meta_service.cpp                          |  12 +-
 src/meta/meta_split_service.cpp                    |  23 +--
 src/meta/meta_state_service.h                      |  24 ++-
 src/meta/meta_state_service_simple.h               |  79 ++++-----
 src/meta/meta_state_service_zookeeper.cpp          |   3 +-
 src/meta/meta_state_service_zookeeper.h            |  81 ++++-----
 src/meta/server_state.cpp                          |  44 ++---
 src/meta/test/backup_test.cpp                      |   4 +-
 src/meta/test/meta_http_service_test.cpp           |   2 +-
 src/meta/test/meta_state/meta_state_service.cpp    | 188 +++++++++++++--------
 src/ranger/ranger_resource_policy_manager.cpp      |  10 +-
 .../distributed_lock_service_zookeeper.cpp         |  11 +-
 src/zookeeper/lock_struct.cpp                      |  88 +++++-----
 src/zookeeper/test/zookeeper_session_test_base.cpp |   5 +-
 src/zookeeper/zookeeper_session.cpp                |  73 ++++----
 src/zookeeper/zookeeper_session.h                  |  22 +--
 18 files changed, 392 insertions(+), 297 deletions(-)

diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp
index 446199c62..e4140fc7e 100644
--- a/src/meta/meta_backup_service.cpp
+++ b/src/meta/meta_backup_service.cpp
@@ -1129,7 +1129,7 @@ backup_service::backup_service(meta_service *meta_svc,
 void backup_service::start_create_policy_meta_root(dsn::task_ptr callback)
 {
     LOG_DEBUG("create policy meta root({}) on remote_storage", 
_policy_meta_root);
-    _meta_svc->get_remote_storage()->create_node(
+    _meta_svc->get_remote_storage()->create_empty_node(
         _policy_meta_root, LPC_DEFAULT_CALLBACK, [this, 
callback](dsn::error_code err) {
             if (err == dsn::ERR_OK || err == ERR_NODE_ALREADY_EXIST) {
                 LOG_INFO(
@@ -1438,7 +1438,8 @@ void backup_service::do_add_policy(dsn::message_ex *req,
                 CHECK(false, "we can't handle this when create backup policy, 
err({})", err);
             }
         },
-        value);
+        value,
+        nullptr);
 }
 
 void backup_service::do_update_policy_to_remote_storage(
@@ -1449,7 +1450,10 @@ void backup_service::do_update_policy_to_remote_storage(
     std::string policy_path = get_policy_path(p.policy_name);
     blob value = json::json_forwarder<policy>::encode(p);
     _meta_svc->get_remote_storage()->set_data(
-        policy_path, value, LPC_DEFAULT_CALLBACK, [this, rpc, p, 
p_context_ptr](error_code err) {
+        policy_path,
+        value,
+        LPC_DEFAULT_CALLBACK,
+        [this, rpc, p, p_context_ptr](error_code err) {
             if (err == ERR_OK) {
                 configuration_modify_backup_policy_response resp;
                 resp.err = ERR_OK;
@@ -1473,7 +1477,8 @@ void backup_service::do_update_policy_to_remote_storage(
             } else {
                 CHECK(false, "we can't handle this when create backup policy, 
err({})", err);
             }
-        });
+        },
+        nullptr);
 }
 
 bool backup_service::is_valid_policy_name_unlocked(const std::string 
&policy_name,
diff --git a/src/meta/meta_bulk_load_service.cpp 
b/src/meta/meta_bulk_load_service.cpp
index ba7a2556b..fd2d15a91 100644
--- a/src/meta/meta_bulk_load_service.cpp
+++ b/src/meta/meta_bulk_load_service.cpp
@@ -2098,7 +2098,9 @@ void 
bulk_load_service::check_app_bulk_load_states(std::shared_ptr<app_state> ap
 {
     std::string app_path = get_app_bulk_load_path(app->app_id);
     _meta_svc->get_remote_storage()->node_exist(
-        app_path, LPC_META_CALLBACK, [this, app_path, app, 
is_app_bulk_loading](error_code err) {
+        app_path,
+        LPC_META_CALLBACK,
+        [this, app_path, app, is_app_bulk_loading](error_code err) {
             if (err != ERR_OK && err != ERR_OBJECT_NOT_FOUND) {
                 LOG_WARNING("check app({}) bulk load dir({}) failed, error = 
{}, try later",
                             app->app_name,
@@ -2128,7 +2130,8 @@ void 
bulk_load_service::check_app_bulk_load_states(std::shared_ptr<app_state> ap
             // Normal cases:
             // err = ERR_OBJECT_NOT_FOUND, is_app_bulk_load = false: app is 
not executing bulk load
             // err = ERR_OK, is_app_bulk_load = true: app used to be executing 
bulk load
-        });
+        },
+        nullptr);
 }
 
 } // namespace replication
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 010a69736..09be67773 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -234,18 +234,18 @@ error_code meta_service::remote_storage_initialize()
 
     std::vector<std::string> slices;
     utils::split_args(FLAGS_cluster_root, slices, '/');
-    std::string current = "";
-    for (unsigned int i = 0; i != slices.size(); ++i) {
-        current = utils::filesystem::concat_path_unix_style(current, 
slices[i]);
-        task_ptr tsk =
-            _storage->create_node(current, LPC_META_CALLBACK, 
[&err](error_code ec) { err = ec; });
+    std::string current;
+    for (const auto &slice : slices) {
+        current = utils::filesystem::concat_path_unix_style(current, slice);
+        task_ptr tsk = _storage->create_empty_node(
+            current, LPC_META_CALLBACK, [&err](error_code ec) { err = ec; });
         tsk->wait();
         if (err != ERR_OK && err != ERR_NODE_ALREADY_EXIST) {
             LOG_ERROR("create node failed, node_path = {}, err = {}", current, 
err);
             return err;
         }
     }
-    _cluster_root = current.empty() ? "/" : current;
+    _cluster_root = current.empty() ? "/" : std::move(current);
 
     LOG_INFO("init meta_state_service succeed, cluster_root = {}", 
_cluster_root);
     return ERR_OK;
diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp
index b1e7cd708..79a12da5d 100644
--- a/src/meta/meta_split_service.cpp
+++ b/src/meta/meta_split_service.cpp
@@ -248,19 +248,20 @@ dsn::task_ptr 
meta_split_service::add_child_on_remote_storage(register_child_rpc
                       std::placeholders::_1,
                       rpc,
                       create_new),
-            value);
-    } else {
-        return _meta_svc->get_remote_storage()->set_data(
-            partition_path,
             value,
-            LPC_META_STATE_HIGH,
-            
std::bind(&meta_split_service::on_add_child_on_remote_storage_reply,
-                      this,
-                      std::placeholders::_1,
-                      rpc,
-                      create_new),
-            _meta_svc->tracker());
+            nullptr);
     }
+
+    return _meta_svc->get_remote_storage()->set_data(
+        partition_path,
+        value,
+        LPC_META_STATE_HIGH,
+        std::bind(&meta_split_service::on_add_child_on_remote_storage_reply,
+                  this,
+                  std::placeholders::_1,
+                  rpc,
+                  create_new),
+        _meta_svc->tracker());
 }
 
 void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
diff --git a/src/meta/meta_state_service.h b/src/meta/meta_state_service.h
index dab812ef9..ec77ff563 100644
--- a/src/meta/meta_state_service.h
+++ b/src/meta/meta_state_service.h
@@ -104,7 +104,7 @@ public:
     virtual task_ptr submit_transaction(const 
std::shared_ptr<transaction_entries> &entries,
                                         task_code cb_code,
                                         const err_callback &cb_transaction,
-                                        dsn::task_tracker *tracker = nullptr) 
= 0;
+                                        dsn::task_tracker *tracker) = 0;
 
     /*
      * create a dir node
@@ -117,8 +117,16 @@ public:
     virtual task_ptr create_node(const std::string &node,
                                  task_code cb_code,
                                  const err_callback &cb_create,
-                                 const blob &value = blob(),
-                                 dsn::task_tracker *tracker = nullptr) = 0;
+                                 const blob &value,
+                                 dsn::task_tracker *tracker) = 0;
+
+    // The same as create_node(), except that `value` is empty and `tracker` 
is nullptr.
+    task_ptr
+    create_empty_node(const std::string &node, task_code cb_code, const 
err_callback &cb_create)
+    {
+        return create_node(node, cb_code, cb_create, blob(), nullptr);
+    }
+
     /*
      * delete a dir, the directory may be empty or not
      * node: the dir name with full path
@@ -131,7 +139,7 @@ public:
                                  bool recursively_delete,
                                  task_code cb_code,
                                  const err_callback &cb_delete,
-                                 dsn::task_tracker *tracker = nullptr) = 0;
+                                 dsn::task_tracker *tracker) = 0;
     /*
      * check if the node dir exists
      * node: the dir name with full path
@@ -141,7 +149,7 @@ public:
     virtual task_ptr node_exist(const std::string &node,
                                 task_code cb_code,
                                 const err_callback &cb_exist,
-                                dsn::task_tracker *tracker = nullptr) = 0;
+                                dsn::task_tracker *tracker) = 0;
     /*
      * get the data in node
      * node: dir name with full path
@@ -153,7 +161,7 @@ public:
     virtual task_ptr get_data(const std::string &node,
                               task_code cb_code,
                               const err_value_callback &cb_get_data,
-                              dsn::task_tracker *tracker = nullptr) = 0;
+                              dsn::task_tracker *tracker) = 0;
     /*
      * set the data of the node
      * node: dir name with full path
@@ -165,7 +173,7 @@ public:
                               const blob &value,
                               task_code cb_code,
                               const err_callback &cb_set_data,
-                              dsn::task_tracker *tracker = nullptr) = 0;
+                              dsn::task_tracker *tracker) = 0;
     /*
      * get all childrens of a node
      * node: dir name with full path
@@ -175,7 +183,7 @@ public:
     virtual task_ptr get_children(const std::string &node,
                                   task_code cb_code,
                                   const err_stringv_callback &cb_get_children,
-                                  dsn::task_tracker *tracker = nullptr) = 0;
+                                  dsn::task_tracker *tracker) = 0;
 };
 } // namespace dist
 } // namespace dsn
diff --git a/src/meta/meta_state_service_simple.h 
b/src/meta/meta_state_service_simple.h
index 9a5a6b395..5a6c9ef38 100644
--- a/src/meta/meta_state_service_simple.h
+++ b/src/meta/meta_state_service_simple.h
@@ -72,51 +72,52 @@ public:
     }
 
     // work_path = (argc > 0 ? argv[0] : current_app_data_dir)
-    virtual error_code initialize(const std::vector<std::string> &args) 
override;
-    virtual error_code finalize() override { return ERR_OK; }
+    error_code initialize(const std::vector<std::string> &args) override;
+    error_code finalize() override { return ERR_OK; }
 
-    virtual std::shared_ptr<meta_state_service::transaction_entries>
+    std::shared_ptr<meta_state_service::transaction_entries>
     new_transaction_entries(unsigned int capacity) override;
 
-    virtual task_ptr
+    task_ptr
     submit_transaction(const 
std::shared_ptr<meta_state_service::transaction_entries> &t_entries,
                        task_code cb_code,
                        const err_callback &cb_create_tree,
-                       dsn::task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr create_node(const std::string &node,
-                                 task_code cb_code,
-                                 const err_callback &cb_create,
-                                 const blob &value = blob(),
-                                 dsn::task_tracker *tracker = nullptr) 
override;
-
-    virtual task_ptr delete_node(const std::string &node,
-                                 bool recursively_delete,
-                                 task_code cb_code,
-                                 const err_callback &cb_delete,
-                                 dsn::task_tracker *tracker = nullptr) 
override;
-
-    virtual task_ptr node_exist(const std::string &node,
-                                task_code cb_code,
-                                const err_callback &cb_exist,
-                                dsn::task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr get_data(const std::string &node,
-                              task_code cb_code,
-                              const err_value_callback &cb_get_data,
-                              dsn::task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr set_data(const std::string &node,
-                              const blob &value,
-                              task_code cb_code,
-                              const err_callback &cb_set_data,
-                              dsn::task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr get_children(const std::string &node,
-                                  task_code cb_code,
-                                  const err_stringv_callback &cb_get_children,
-                                  dsn::task_tracker *tracker = nullptr) 
override;
-    virtual ~meta_state_service_simple() override;
+                       dsn::task_tracker *tracker) override;
+
+    task_ptr create_node(const std::string &node,
+                         task_code cb_code,
+                         const err_callback &cb_create,
+                         const blob &value,
+                         dsn::task_tracker *tracker) override;
+
+    task_ptr delete_node(const std::string &node,
+                         bool recursively_delete,
+                         task_code cb_code,
+                         const err_callback &cb_delete,
+                         dsn::task_tracker *tracker) override;
+
+    task_ptr node_exist(const std::string &node,
+                        task_code cb_code,
+                        const err_callback &cb_exist,
+                        dsn::task_tracker *tracker) override;
+
+    task_ptr get_data(const std::string &node,
+                      task_code cb_code,
+                      const err_value_callback &cb_get_data,
+                      dsn::task_tracker *tracker) override;
+
+    task_ptr set_data(const std::string &node,
+                      const blob &value,
+                      task_code cb_code,
+                      const err_callback &cb_set_data,
+                      dsn::task_tracker *tracker) override;
+
+    task_ptr get_children(const std::string &node,
+                          task_code cb_code,
+                          const err_stringv_callback &cb_get_children,
+                          dsn::task_tracker *tracker) override;
+
+    ~meta_state_service_simple() override;
 
 private:
     struct operation
diff --git a/src/meta/meta_state_service_zookeeper.cpp 
b/src/meta/meta_state_service_zookeeper.cpp
index e9c659b6a..c84fa86b9 100644
--- a/src/meta/meta_state_service_zookeeper.cpp
+++ b/src/meta/meta_state_service_zookeeper.cpp
@@ -35,6 +35,7 @@
 
 #include "meta_state_service_zookeeper.h"
 #include "runtime/service_app.h"
+#include "utils/blob.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/utils.h"
@@ -199,7 +200,7 @@ 
meta_state_service_zookeeper::new_transaction_entries(unsigned int capacity)
                                        tsk,                                    
                    \
                                        std::placeholders::_1);                 
                    \
     op->_optype = op_type;                                                     
                    \
-    input->_path = node;
+    input->_path = std::make_shared<std::string>(node)
 
 task_ptr meta_state_service_zookeeper::create_node(const std::string &node,
                                                    task_code cb_code,
diff --git a/src/meta/meta_state_service_zookeeper.h 
b/src/meta/meta_state_service_zookeeper.h
index f7194e29e..73129df39 100644
--- a/src/meta/meta_state_service_zookeeper.h
+++ b/src/meta/meta_state_service_zookeeper.h
@@ -37,11 +37,12 @@
 #include "task/task_code.h"
 #include "task/task_tracker.h"
 #include "utils/autoref_ptr.h"
-#include "utils/blob.h"
 #include "utils/error_code.h"
 #include "utils/synchronize.h"
 
 namespace dsn {
+class blob;
+
 namespace dist {
 
 class zookeeper_session;
@@ -50,53 +51,53 @@ class meta_state_service_zookeeper : public 
meta_state_service, public ref_count
 {
 public:
     explicit meta_state_service_zookeeper();
-    virtual ~meta_state_service_zookeeper() override;
+    ~meta_state_service_zookeeper() override;
 
     // no parameter need
-    virtual error_code initialize(const std::vector<std::string> &args) 
override;
-    virtual error_code finalize() override;
+    error_code initialize(const std::vector<std::string> &args) override;
+    error_code finalize() override;
 
-    virtual std::shared_ptr<meta_state_service::transaction_entries>
+    std::shared_ptr<meta_state_service::transaction_entries>
     new_transaction_entries(unsigned int capacity) override;
 
-    virtual task_ptr
+    task_ptr
     submit_transaction(const 
std::shared_ptr<meta_state_service::transaction_entries> &entries,
                        task_code cb_code,
                        const err_callback &cb_transaction,
-                       task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr create_node(const std::string &node,
-                                 task_code cb_code,
-                                 const err_callback &cb_create,
-                                 const blob &value = blob(),
-                                 dsn::task_tracker *tracker = nullptr) 
override;
-
-    virtual task_ptr delete_node(const std::string &node,
-                                 bool recursively_delete,
-                                 task_code cb_code,
-                                 const err_callback &cb_delete,
-                                 dsn::task_tracker *tracker = nullptr) 
override;
-
-    virtual task_ptr node_exist(const std::string &node,
-                                task_code cb_code,
-                                const err_callback &cb_exist,
-                                dsn::task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr get_data(const std::string &node,
-                              task_code cb_code,
-                              const err_value_callback &cb_get_data,
-                              dsn::task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr set_data(const std::string &node,
-                              const blob &value,
-                              task_code cb_code,
-                              const err_callback &cb_set_data,
-                              dsn::task_tracker *tracker = nullptr) override;
-
-    virtual task_ptr get_children(const std::string &node,
-                                  task_code cb_code,
-                                  const err_stringv_callback &cb_get_children,
-                                  dsn::task_tracker *tracker = nullptr) 
override;
+                       task_tracker *tracker) override;
+
+    task_ptr create_node(const std::string &node,
+                         task_code cb_code,
+                         const err_callback &cb_create,
+                         const blob &value,
+                         dsn::task_tracker *tracker) override;
+
+    task_ptr delete_node(const std::string &node,
+                         bool recursively_delete,
+                         task_code cb_code,
+                         const err_callback &cb_delete,
+                         dsn::task_tracker *tracker) override;
+
+    task_ptr node_exist(const std::string &node,
+                        task_code cb_code,
+                        const err_callback &cb_exist,
+                        dsn::task_tracker *tracker) override;
+
+    task_ptr get_data(const std::string &node,
+                      task_code cb_code,
+                      const err_value_callback &cb_get_data,
+                      dsn::task_tracker *tracker) override;
+
+    task_ptr set_data(const std::string &node,
+                      const blob &value,
+                      task_code cb_code,
+                      const err_callback &cb_set_data,
+                      dsn::task_tracker *tracker) override;
+
+    task_ptr get_children(const std::string &node,
+                          task_code cb_code,
+                          const err_stringv_callback &cb_get_children,
+                          dsn::task_tracker *tracker) override;
 
     task_ptr delete_empty_node(const std::string &node,
                                task_code cb_code,
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 4e504ff24..93f56cab0 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -530,7 +530,8 @@ error_code server_state::sync_apps_to_remote_storage()
         apps_path,
         LPC_META_CALLBACK,
         [&err](error_code ec) { err = ec; },
-        blob(lock_state, 0, strlen(lock_state)));
+        blob(lock_state, 0, strlen(lock_state)),
+        nullptr);
     t->wait();
 
     if (err != ERR_NODE_ALREADY_EXIST && err != ERR_OK) {
@@ -581,10 +582,12 @@ error_code server_state::sync_apps_to_remote_storage()
     }
 
     tracker.wait_outstanding_tasks();
-    t = _meta_svc->get_remote_storage()->set_data(_apps_root,
-                                                  blob(unlock_state, 0, 
strlen(unlock_state)),
-                                                  LPC_META_STATE_HIGH,
-                                                  [&err](dsn::error_code e) { 
err = e; });
+    t = _meta_svc->get_remote_storage()->set_data(
+        _apps_root,
+        blob(unlock_state, 0, strlen(unlock_state)),
+        LPC_META_STATE_HIGH,
+        [&err](dsn::error_code e) { err = e; },
+        nullptr);
     t->wait();
     if (dsn::ERR_OK == err) {
         LOG_INFO("set {} to unlock state in remote storage", _apps_root);
@@ -725,18 +728,21 @@ dsn::error_code 
server_state::sync_apps_from_remote_storage()
 
     std::string transaction_state;
     storage
-        ->get_data(_apps_root,
-                   LPC_META_CALLBACK,
-                   [&err, &transaction_state](error_code ec, const blob 
&value) {
-                       err = ec;
-                       if (ec == dsn::ERR_OK) {
-                           transaction_state.assign(value.data(), 
value.length());
-                       }
-                   })
+        ->get_data(
+            _apps_root,
+            LPC_META_CALLBACK,
+            [&err, &transaction_state](error_code ec, const blob &value) {
+                err = ec;
+                if (ec == dsn::ERR_OK) {
+                    transaction_state.assign(value.data(), value.length());
+                }
+            },
+            nullptr)
         ->wait();
 
-    if (ERR_OBJECT_NOT_FOUND == err)
+    if (ERR_OBJECT_NOT_FOUND == err) {
         return err;
+    }
     CHECK_EQ_MSG(ERR_OK, err, "can't handle this error");
     CHECK(transaction_state == std::string(unlock_state) || 
transaction_state.empty(),
           "invalid transaction state({})",
@@ -1105,7 +1111,7 @@ void 
server_state::init_app_partition_node(std::shared_ptr<app_state> &app,
     std::string app_partition_path = get_partition_path(*app, pidx);
     dsn::blob value = 
dsn::json::json_forwarder<partition_configuration>::encode(app->pcs[pidx]);
     _meta_svc->get_remote_storage()->create_node(
-        app_partition_path, LPC_META_STATE_HIGH, on_create_app_partition, 
value);
+        app_partition_path, LPC_META_STATE_HIGH, on_create_app_partition, 
value, nullptr);
 }
 
 void server_state::get_allowed_partitions(dsn::message_ex *msg,
@@ -1150,7 +1156,7 @@ void 
server_state::do_app_create(std::shared_ptr<app_state> &app)
     std::string app_dir = get_app_path(*app);
     blob value = app->to_json(app_status::AS_AVAILABLE);
     _meta_svc->get_remote_storage()->create_node(
-        app_dir, LPC_META_STATE_HIGH, on_create_app_root, value);
+        app_dir, LPC_META_STATE_HIGH, on_create_app_root, value, nullptr);
 }
 
 void server_state::create_app(dsn::message_ex *msg)
@@ -1443,7 +1449,7 @@ void server_state::do_app_drop(std::shared_ptr<app_state> 
&app)
     blob json_app = app->to_json(app_status::AS_DROPPED);
     std::string app_path = get_app_path(*app);
     _meta_svc->get_remote_storage()->set_data(
-        app_path, json_app, LPC_META_STATE_HIGH, after_mark_app_dropped);
+        app_path, json_app, LPC_META_STATE_HIGH, after_mark_app_dropped, 
nullptr);
 }
 
 void server_state::drop_app(dsn::message_ex *msg)
@@ -1593,7 +1599,7 @@ void 
server_state::do_app_recall(std::shared_ptr<app_state> &app)
     std::string app_path = get_app_path(*app);
     blob value = app->to_json(app_status::AS_AVAILABLE);
     _meta_svc->get_remote_storage()->set_data(
-        app_path, value, LPC_META_STATE_HIGH, after_recall_app);
+        app_path, value, LPC_META_STATE_HIGH, after_recall_app, nullptr);
 }
 
 void server_state::recall_app(dsn::message_ex *msg)
@@ -2079,7 +2085,7 @@ void 
server_state::recall_partition(std::shared_ptr<app_state> &app, int pidx)
     blob json_partition = 
dsn::json::json_forwarder<partition_configuration>::encode(pc);
     std::string partition_path = get_partition_path(pc.pid);
     _meta_svc->get_remote_storage()->set_data(
-        partition_path, json_partition, LPC_META_STATE_HIGH, 
on_recall_partition);
+        partition_path, json_partition, LPC_META_STATE_HIGH, 
on_recall_partition, nullptr);
 }
 
 void server_state::drop_partition(std::shared_ptr<app_state> &app, int pidx)
diff --git a/src/meta/test/backup_test.cpp b/src/meta/test/backup_test.cpp
index 0c9dee7eb..13e119617 100644
--- a/src/meta/test/backup_test.cpp
+++ b/src/meta/test/backup_test.cpp
@@ -254,7 +254,7 @@ protected:
         _service->_backup_handler->backup_option().request_backup_period_ms = 
20_ms;
         _service->_backup_handler->backup_option().issue_backup_interval_ms = 
1000_ms;
         _service->_storage
-            ->create_node(
+            ->create_empty_node(
                 policy_root, dsn::TASK_CODE_EXEC_INLINED, 
[&ec](dsn::error_code err) { ec = err; })
             ->wait();
         ASSERT_EQ(dsn::ERR_OK, ec);
@@ -274,7 +274,7 @@ protected:
         _mp.set_policy(_policy);
 
         _service->_storage
-            ->create_node(
+            ->create_empty_node(
                 policy_dir, dsn::TASK_CODE_EXEC_INLINED, [&ec](dsn::error_code 
err) { ec = err; })
             ->wait();
         ASSERT_EQ(dsn::ERR_OK, ec);
diff --git a/src/meta/test/meta_http_service_test.cpp 
b/src/meta/test/meta_http_service_test.cpp
index 8e0095bec..8611868a5 100644
--- a/src/meta/test/meta_http_service_test.cpp
+++ b/src/meta/test/meta_http_service_test.cpp
@@ -123,7 +123,7 @@ public:
         const std::string policy_root = "/test";
         dsn::error_code ec;
         _ms->_storage
-            ->create_node(
+            ->create_empty_node(
                 _policy_root, dsn::TASK_CODE_EXEC_INLINED, 
[&ec](dsn::error_code err) { ec = err; })
             ->wait();
         _mhs = std::make_unique<meta_http_service>(_ms.get());
diff --git a/src/meta/test/meta_state/meta_state_service.cpp 
b/src/meta/test/meta_state/meta_state_service.cpp
index 6ca22721d..880155dbc 100644
--- a/src/meta/test/meta_state/meta_state_service.cpp
+++ b/src/meta/test/meta_state/meta_state_service.cpp
@@ -27,6 +27,7 @@
 #include "meta/meta_state_service.h"
 
 #include <boost/lexical_cast.hpp>
+#include <array>
 #include <chrono>
 #include <ostream>
 #include <thread>
@@ -61,42 +62,56 @@ void provider_basic_test(const service_creator_func 
&service_creator,
                          const service_deleter_func &service_deleter)
 {
     // environment
-    auto service = service_creator();
+    auto *service = service_creator();
 
     // bondary check
-    service->node_exist("/", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
-    service->node_exist("", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_err)->wait();
+    service->node_exist("/", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok, nullptr)->wait();
+    service->node_exist("", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_err, nullptr)->wait();
     // recursive delete test
     {
-        service->create_node("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
-        service->node_exist("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
-        service->create_node("/1/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
-        service->get_children("/1",
-                              META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
-                              [](error_code ec, const std::vector<std::string> 
&children) {
-                                  CHECK_EQ(ERR_OK, ec);
-                                  CHECK_EQ(1, children.size());
-                                  CHECK_EQ("1", *children.begin());
-                              });
-        service->node_exist("/1/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
-        service->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
+        service->create_empty_node("/1", 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
             ->wait();
-        service->delete_node("/1", true, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
+        service->node_exist("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok, nullptr)
+            ->wait();
+        service->create_empty_node("/1/1", 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
+            ->wait();
+        service->get_children(
+            "/1",
+            META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+            [](error_code ec, const std::vector<std::string> &children) {
+                CHECK_EQ(ERR_OK, ec);
+                CHECK_EQ(1, children.size());
+                CHECK_EQ("1", *children.begin());
+            },
+            nullptr);
+        service->node_exist("/1/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok, nullptr)
+            ->wait();
+        service
+            ->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err, nullptr)
+            ->wait();
+        service
+            ->delete_node("/1", true, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok, nullptr)
+            ->wait();
+        service->node_exist("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_err, nullptr)
             ->wait();
-        service->node_exist("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_err)->wait();
     }
     // repeat create test
     {
-        service->create_node("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
-        service->create_node("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_err)->wait();
+        service->create_empty_node("/1", 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
+            ->wait();
+        service->create_empty_node("/1", 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
+            ->wait();
     }
     // check replay
     {
         service_deleter(service);
         service = service_creator();
-        service->node_exist("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
-        service->node_exist("/1/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_err)->wait();
-        service->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
+        service->node_exist("/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok, nullptr)
+            ->wait();
+        service->node_exist("/1/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_err, nullptr)
+            ->wait();
+        service
+            ->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok, nullptr)
             ->wait();
     }
     // create & get data
@@ -104,19 +119,24 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         dsn::binary_writer writer;
         writer.write(0xdeadbeef);
         service
-            ->create_node(
-                "/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok, 
writer.get_buffer())
+            ->create_node("/1",
+                          META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+                          expect_ok,
+                          writer.get_buffer(),
+                          nullptr)
             ->wait();
         service
-            ->get_data("/1",
-                       META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
-                       [](error_code ec, const dsn::blob &value) {
-                           CHECK_EQ(ERR_OK, ec);
-                           dsn::binary_reader reader(value);
-                           int read_value = 0;
-                           reader.read(read_value);
-                           CHECK_EQ(0xdeadbeef, read_value);
-                       })
+            ->get_data(
+                "/1",
+                META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+                [](error_code ec, const dsn::blob &value) {
+                    CHECK_EQ(ERR_OK, ec);
+                    dsn::binary_reader reader(value);
+                    int read_value = 0;
+                    reader.read(read_value);
+                    CHECK_EQ(0xdeadbeef, read_value);
+                },
+                nullptr)
             ->wait();
     }
     // set & get data
@@ -124,24 +144,30 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         dsn::binary_writer writer;
         writer.write(0xbeefdead);
         service
-            ->set_data(
-                "/1", writer.get_buffer(), 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
+            ->set_data("/1",
+                       writer.get_buffer(),
+                       META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+                       expect_ok,
+                       nullptr)
             ->wait();
         service
-            ->get_data("/1",
-                       META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
-                       [](error_code ec, const dsn::blob &value) {
-                           CHECK_EQ(ERR_OK, ec);
-                           dsn::binary_reader reader(value);
-                           int read_value = 0;
-                           reader.read(read_value);
-                           CHECK_EQ(0xbeefdead, read_value);
-                       })
+            ->get_data(
+                "/1",
+                META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+                [](error_code ec, const dsn::blob &value) {
+                    CHECK_EQ(ERR_OK, ec);
+                    dsn::binary_reader reader(value);
+                    int read_value = 0;
+                    reader.read(read_value);
+                    CHECK_EQ(0xbeefdead, read_value);
+                },
+                nullptr)
             ->wait();
     }
     // clean the node created in previous code-block, to support test in next 
round
     {
-        service->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
+        service
+            ->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok, nullptr)
             ->wait();
     }
 
@@ -158,7 +184,7 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         entries->delete_node("/2/3");
 
         auto tsk = service->submit_transaction(
-            entries, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok);
+            entries, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok, 
nullptr);
         tsk->wait();
         for (unsigned int i = 0; i < 5; ++i) {
             ASSERT_EQ(ERR_OK, entries->get_result(i));
@@ -171,11 +197,13 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         entries->delete_node("/2"); // delete a non empty dir
         entries->create_node("/5");
 
-        service->submit_transaction(entries, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
+        service
+            ->submit_transaction(
+                entries, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err, 
nullptr)
             ->wait();
-        error_code err[4] = {ERR_OK, ERR_OK, ERR_INVALID_PARAMETERS, 
ERR_INCONSISTENT_STATE};
-        for (unsigned int i = 0; i < 4; ++i) {
-            ASSERT_EQ(err[i], entries->get_result(i));
+        std::array errors = {ERR_OK, ERR_OK, ERR_INVALID_PARAMETERS, 
ERR_INCONSISTENT_STATE};
+        for (unsigned int i = 0; i < errors.size(); ++i) {
+            ASSERT_EQ(errors.at(i), entries->get_result(i));
         }
 
         // another invalid transaction
@@ -186,11 +214,13 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         // although this is also invalid, but ignored due to previous one has 
stop the transaction
         entries->set_data("/5", writer.get_buffer());
 
-        err[2] = ERR_OBJECT_NOT_FOUND;
-        service->submit_transaction(entries, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
+        errors[2] = ERR_OBJECT_NOT_FOUND;
+        service
+            ->submit_transaction(
+                entries, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err, 
nullptr)
             ->wait();
-        for (unsigned int i = 0; i < 4; ++i) {
-            ASSERT_EQ(err[i], entries->get_result(i));
+        for (unsigned int i = 0; i < errors.size(); ++i) {
+            ASSERT_EQ(errors.at(i), entries->get_result(i));
         }
     }
 
@@ -200,25 +230,29 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         service = service_creator();
 
         service
-            ->get_children("/2",
-                           META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
-                           [](error_code ec, const std::vector<std::string> 
&children) {
-                               CHECK_EQ(ERR_OK, ec);
-                               CHECK_EQ(1, children.size());
-                               CHECK_EQ("2", children[0]);
-                           })
+            ->get_children(
+                "/2",
+                META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+                [](error_code ec, const std::vector<std::string> &children) {
+                    CHECK_EQ(ERR_OK, ec);
+                    CHECK_EQ(1, children.size());
+                    CHECK_EQ("2", children[0]);
+                },
+                nullptr)
             ->wait();
 
         service
-            ->get_data("/2",
-                       META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
-                       [](error_code ec, const blob &value) {
-                           CHECK_EQ(ERR_OK, ec);
-                           binary_reader reader(value);
-                           int content_value;
-                           reader.read(content_value);
-                           CHECK_EQ(0xdeadbeef, content_value);
-                       })
+            ->get_data(
+                "/2",
+                META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+                [](error_code ec, const blob &value) {
+                    CHECK_EQ(ERR_OK, ec);
+                    binary_reader reader(value);
+                    int content_value{0};
+                    reader.read(content_value);
+                    CHECK_EQ(0xdeadbeef, content_value);
+                },
+                nullptr)
             ->wait();
     }
 
@@ -228,7 +262,9 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         entries->delete_node("/2/2");
         entries->delete_node("/2");
 
-        service->submit_transaction(entries, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
+        service
+            ->submit_transaction(
+                entries, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok, 
nullptr)
             ->wait();
 
         for (unsigned int i = 0; i < 2; ++i) {
@@ -271,10 +307,12 @@ void provider_recursively_create_delete_test(const 
service_creator_func &creator
     dsn::task_tracker tracker;
 
     service
-        ->delete_node("/r",
-                      true,
-                      META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
-                      [](error_code ec) { LOG_INFO("result: {}", ec); })
+        ->delete_node(
+            "/r",
+            true,
+            META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
+            [](error_code ec) { LOG_INFO("result: {}", ec); },
+            nullptr)
         ->wait();
     service->create_node(
         "/r",
diff --git a/src/ranger/ranger_resource_policy_manager.cpp 
b/src/ranger/ranger_resource_policy_manager.cpp
index f36b739ff..62b89dc26 100644
--- a/src/ranger/ranger_resource_policy_manager.cpp
+++ b/src/ranger/ranger_resource_policy_manager.cpp
@@ -502,7 +502,7 @@ void 
ranger_resource_policy_manager::start_to_dump_and_sync_policies()
     LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
     dsn::task_ptr sync_task = dsn::tasking::create_task(
         LPC_USE_RANGER_ACCESS_CONTROL, &_tracker, [this]() { 
dump_and_sync_policies(); });
-    _meta_svc->get_remote_storage()->create_node(
+    _meta_svc->get_remote_storage()->create_empty_node(
         _ranger_policy_meta_root,
         LPC_USE_RANGER_ACCESS_CONTROL,
         [this, sync_task](dsn::error_code err) {
@@ -541,7 +541,10 @@ void 
ranger_resource_policy_manager::dump_policies_to_remote_storage()
 {
     dsn::blob value = 
json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
     _meta_svc->get_remote_storage()->set_data(
-        _ranger_policy_meta_root, value, LPC_USE_RANGER_ACCESS_CONTROL, 
[this](dsn::error_code e) {
+        _ranger_policy_meta_root,
+        value,
+        LPC_USE_RANGER_ACCESS_CONTROL,
+        [this](dsn::error_code e) {
             if (e == dsn::ERR_OK) {
                 LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
                 return;
@@ -555,7 +558,8 @@ void 
ranger_resource_policy_manager::dump_policies_to_remote_storage()
                 [this]() { dump_policies_to_remote_storage(); },
                 0,
                 kLoadRangerPolicyRetryDelayMs);
-        });
+        },
+        nullptr);
 }
 
 void ranger_resource_policy_manager::update_cached_policies()
diff --git a/src/zookeeper/distributed_lock_service_zookeeper.cpp 
b/src/zookeeper/distributed_lock_service_zookeeper.cpp
index 868b5359e..f6ed6313e 100644
--- a/src/zookeeper/distributed_lock_service_zookeeper.cpp
+++ b/src/zookeeper/distributed_lock_service_zookeeper.cpp
@@ -24,6 +24,7 @@
  * THE SOFTWARE.
  */
 
+#include <fmt/core.h>
 #include <zookeeper/zookeeper.h>
 #include <functional>
 #include <memory>
@@ -110,14 +111,14 @@ error_code 
distributed_lock_service_zookeeper::initialize(const std::vector<std:
 
     std::vector<std::string> slices;
     utils::split_args(lock_root, slices, '/');
-    std::string current = "";
-    for (auto &str : slices) {
+    std::string current;
+    for (const auto &str : slices) {
         utils::notify_event e;
-        int zerr;
-        current = current + "/" + str;
+        int zerr{0};
+        current += fmt::format("/{}", str);
         zookeeper_session::zoo_opcontext *op = 
zookeeper_session::create_context();
         op->_optype = zookeeper_session::ZOO_CREATE;
-        op->_input._path = current;
+        op->_input._path = std::make_shared<std::string>(current);
         op->_callback_function = [&e, &zerr](zookeeper_session::zoo_opcontext 
*op) mutable {
             zerr = op->_output.error;
             e.notify();
diff --git a/src/zookeeper/lock_struct.cpp b/src/zookeeper/lock_struct.cpp
index 2f0c0073d..2f6b2e721 100644
--- a/src/zookeeper/lock_struct.cpp
+++ b/src/zookeeper/lock_struct.cpp
@@ -31,13 +31,14 @@
 #include <memory>
 #include <string>
 #include <string_view>
-#include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "distributed_lock_service_zookeeper.h"
+#include "fmt/core.h"
 #include "lock_struct.h"
 #include "lock_types.h"
+#include "runtime/api_task.h"
 #include "task/async_calls.h"
 #include "task/task.h"
 #include "utils/blob.h"
@@ -67,6 +68,11 @@ bool is_zookeeper_timeout(int zookeeper_error)
     return zookeeper_error == ZCONNECTIONLOSS || zookeeper_error == 
ZOPERATIONTIMEOUT;
 }
 
+void enqueue(task_handler &&callback, lock_struct_ptr _this)
+{
+    tasking::enqueue(TASK_CODE_DLOCK, nullptr, std::move(callback), 
_this->hash());
+}
+
 } // anonymous namespace
 
 #define CHECK_CODE(code, allow_list, code_str)                                 
                    \
@@ -80,12 +86,10 @@ bool is_zookeeper_timeout(int zookeeper_error)
         CHECK_LT_MSG(i, allow_list.size(), "invalid code({})", code_str);      
                    \
     } while (0)
 
-#define EXECUTE(cb, _this) tasking::enqueue(TASK_CODE_DLOCK, nullptr, cb, 
_this->hash())
-
 #define ADD_REF_AND_DELAY_CALL(op, _this)                                      
                    \
     LOG_WARNING("operation {} on {} encounter error, retry later",             
                    \
                 zookeeper_session::string_zoo_operation(op->_optype),          
                    \
-                op->_input._path);                                             
                    \
+                *op->_input._path);                                            
                    \
     zookeeper_session::add_ref(op);                                            
                    \
     tasking::enqueue(                                                          
                    \
         TASK_CODE_DLOCK,                                                       
                    \
@@ -282,18 +286,17 @@ void lock_struct::remove_duplicated_locknode(std::string 
&&znode_path)
             return;
         }
 
-        EXECUTE(
-            std::bind(&lock_struct::after_remove_duplicated_locknode,
-                      _this,
-                      op->_output.error,
-                      std::shared_ptr<std::string>(new 
std::string(std::move(op->_input._path)))),
+        enqueue(
+            [_this, err = op->_output.error, path = op->_input._path]() {
+                lock_struct::after_remove_duplicated_locknode(_this, err, 
path);
+            },
             _this);
     };
 
     zookeeper_session::zoo_opcontext *op = zookeeper_session::create_context();
     op->_optype = zookeeper_session::ZOO_DELETE;
     op->_callback_function = delete_callback_wrapper;
-    op->_input._path = std::move(znode_path);
+    op->_input._path = std::make_shared<std::string>(std::move(znode_path));
     _dist_lock_service->session()->visit(op);
 }
 
@@ -405,11 +408,11 @@ void lock_struct::get_lock_owner(bool watch_myself)
         LOG_INFO("get watcher callback, event type({})",
                  zookeeper_session::string_zoo_event(event));
         if (watch_myself) {
-            EXECUTE(std::bind(&lock_struct::my_lock_removed, _this, event), 
_this);
+            enqueue([_this, event]() { lock_struct::my_lock_removed(_this, 
event); }, _this);
             return;
         }
 
-        EXECUTE(std::bind(&lock_struct::owner_change, _this, event), _this);
+        enqueue([_this, event]() { lock_struct::owner_change(_this, event); }, 
_this);
     };
 
     auto after_get_owner_wrapper = [_this, 
watch_myself](zookeeper_session::zoo_opcontext *op) {
@@ -435,19 +438,20 @@ void lock_struct::get_lock_owner(bool watch_myself)
         }
 
         if (output.error != ZOK) {
-            EXECUTE(std::bind(cb, output.error, nullptr), _this);
+            enqueue([cb, err = output.error]() { cb(err, nullptr); }, _this);
             return;
         }
 
         std::shared_ptr<std::string> buf(
             new std::string(output.get_op.value, output.get_op.value_length));
-        EXECUTE(std::bind(cb, ZOK, buf), _this);
+        enqueue([cb, buf]() { cb(ZOK, buf); }, _this);
     };
 
     zookeeper_session::zoo_opcontext *op = zookeeper_session::create_context();
     op->_optype = zookeeper_session::ZOO_GET;
     op->_callback_function = after_get_owner_wrapper;
-    op->_input._path = _lock_dir + "/" + _owner._node_seq_name;
+    op->_input._path =
+        std::make_shared<std::string>(fmt::format("{}/{}", _lock_dir, 
_owner._node_seq_name));
 
     op->_input._is_set_watch = 1;
     op->_input._owner = this;
@@ -552,8 +556,10 @@ void lock_struct::get_lockdir_nodes()
         }
 
         if (op->_output.error != ZOK) {
-            EXECUTE(
-                std::bind(&lock_struct::after_get_lockdir_nodes, _this, 
op->_output.error, nullptr),
+            enqueue(
+                [_this, err = op->_output.error]() {
+                    lock_struct::after_get_lockdir_nodes(_this, err, nullptr);
+                },
                 _this);
             return;
         }
@@ -565,15 +571,16 @@ void lock_struct::get_lockdir_nodes()
             (*children)[i].assign(vec->data[i]);
         }
 
-        EXECUTE(
-            std::bind(&lock_struct::after_get_lockdir_nodes, _this, 
op->_output.error, children),
-            _this);
+        enqueue([_this,
+                 err = op->_output.error,
+                 children]() { lock_struct::after_get_lockdir_nodes(_this, 
err, children); },
+                _this);
     };
 
     zookeeper_session::zoo_opcontext *op = zookeeper_session::create_context();
     op->_optype = zookeeper_session::ZOO_GETCHILDREN;
     op->_callback_function = result_wrapper;
-    op->_input._path = _lock_dir;
+    op->_input._path = std::make_shared<std::string>(_lock_dir);
     op->_input._is_set_watch = 0;
     _dist_lock_service->session()->visit(op);
 }
@@ -637,20 +644,23 @@ void lock_struct::create_locknode()
         }
 
         if (op->_output.error != ZOK) {
-            EXECUTE(
-                std::bind(&lock_struct::after_create_locknode, _this, 
op->_output.error, nullptr),
+            enqueue(
+                [_this, err = op->_output.error]() {
+                    lock_struct::after_create_locknode(_this, err, nullptr);
+                },
                 _this);
             return;
         }
 
         std::shared_ptr<std::string> path(new 
std::string(op->_output.create_op._created_path));
-        EXECUTE(std::bind(&lock_struct::after_create_locknode, _this, ZOK, 
path), _this);
+        enqueue([_this, path]() { lock_struct::after_create_locknode(_this, 
ZOK, path); }, _this);
     };
 
     zookeeper_session::zoo_input &input = op->_input;
-    input._path = _lock_dir + "/" + 
distributed_lock_service_zookeeper::LOCK_NODE_PREFIX;
+    input._path = std::make_shared<std::string>(
+        fmt::format("{}/{}", _lock_dir, 
distributed_lock_service_zookeeper::LOCK_NODE_PREFIX));
     input._value.assign(_myself._node_value.c_str(), 0, 
_myself._node_value.length());
-    input._flags = ZOO_EPHEMERAL | ZOO_SEQUENCE;
+    input._flags = ZOO_EPHEMERAL_SEQUENTIAL;
     op->_callback_function = result_wrapper;
     _dist_lock_service->session()->visit(op);
 }
@@ -677,7 +687,7 @@ void lock_struct::after_create_lockdir(lock_struct_ptr 
_this, int ec)
     }
 
     if (ZINVALIDSTATE == ec) {
-        LOG_ERROR("create lock dir failed got session expire, _path({})", 
_this->_lock_dir);
+        LOG_ERROR("create lock dir {} failed got session expire", 
_this->_lock_dir);
         _this->_lock_dir.clear();
         _this->on_expire();
         return;
@@ -710,11 +720,13 @@ void lock_struct::try_lock(lock_struct_ptr _this,
                 return;
             }
 
-            EXECUTE(std::bind(&lock_struct::after_create_lockdir, _this, 
op->_output.error), _this);
+            enqueue([_this,
+                     err = op->_output.error]() { 
lock_struct::after_create_lockdir(_this, err); },
+                    _this);
         };
         zookeeper_session::zoo_opcontext *op = 
zookeeper_session::create_context();
         op->_optype = zookeeper_session::ZOO_CREATE;
-        op->_input._path = _this->_lock_dir;
+        op->_input._path = std::make_shared<std::string>(_this->_lock_dir);
         op->_callback_function = result_wrapper;
         _this->_dist_lock_service->session()->visit(op);
         return;
@@ -757,8 +769,8 @@ void lock_struct::after_remove_my_locknode(lock_struct_ptr 
_this, int ec, bool r
         dsn_ec = ERR_INVALID_STATE;
     } else {
         if (ZINVALIDSTATE == ec) {
-            _this->on_expire(); // when expire, only expire_callback is 
called, the unlock/cancel's
-                                // callback is ignored
+            _this->on_expire(); // when expire, only expire_callback is 
called, the
+                                // unlock/cancel's callback is ignored
             return;
         }
 
@@ -788,23 +800,23 @@ void lock_struct::remove_my_locknode(std::string 
&&znode_path,
     lock_struct_ptr _this = this;
     auto result_wrapper =
         [_this, ignore_callback, 
remove_for_unlock](zookeeper_session::zoo_opcontext *op) {
-            LOG_INFO("delete node {}, result({})", op->_input._path, 
zerror(op->_output.error));
+            LOG_INFO("delete node {}, result({})", *op->_input._path, 
zerror(op->_output.error));
             if (is_zookeeper_timeout(op->_output.error)) {
                 ADD_REF_AND_DELAY_CALL(op, _this);
                 return;
             }
 
             if (!ignore_callback) {
-                EXECUTE(std::bind(&lock_struct::after_remove_my_locknode,
-                                  _this,
-                                  op->_output.error,
-                                  remove_for_unlock),
-                        _this);
+                enqueue(
+                    [_this, err = op->_output.error, remove_for_unlock]() {
+                        lock_struct::after_remove_my_locknode(_this, err, 
remove_for_unlock);
+                    },
+                    _this);
             }
         };
     zookeeper_session::zoo_opcontext *op = zookeeper_session::create_context();
     op->_optype = zookeeper_session::ZOO_DELETE;
-    op->_input._path = std::move(znode_path);
+    op->_input._path = std::make_shared<std::string>(std::move(znode_path));
     op->_callback_function = result_wrapper;
     _dist_lock_service->session()->visit(op);
 }
diff --git a/src/zookeeper/test/zookeeper_session_test_base.cpp 
b/src/zookeeper/test/zookeeper_session_test_base.cpp
index 1d331691e..ae01f53f5 100644
--- a/src/zookeeper/test/zookeeper_session_test_base.cpp
+++ b/src/zookeeper/test/zookeeper_session_test_base.cpp
@@ -21,6 +21,7 @@
 #include <atomic>
 #include <iostream>
 #include <iterator>
+#include <memory>
 #include <utility>
 
 #include "gmock/gmock.h"
@@ -89,7 +90,7 @@ void ZookeeperSessionTestBase::operate_node(
 
     auto *op = zookeeper_session::create_context();
     op->_optype = op_type;
-    op->_input._path = path;
+    op->_input._path = std::make_shared<std::string>(path);
     op->_input._value = blob::create_from_bytes(std::string(data));
     op->_callback_function =
         [&zerr, &callback, &on_completed](zookeeper_session::zoo_opcontext 
*op) mutable {
@@ -223,7 +224,7 @@ void ZookeeperSessionTestBase::get_sub_nodes(const 
std::string &path,
 
     auto *op = zookeeper_session::create_context();
     op->_optype = zookeeper_session::ZOO_GETCHILDREN;
-    op->_input._path = path;
+    op->_input._path = std::make_shared<std::string>(path);
     op->_callback_function =
         [&zerr, &sub_nodes, &on_completed](zookeeper_session::zoo_opcontext 
*op) mutable {
             zerr = op->_output.error;
diff --git a/src/zookeeper/zookeeper_session.cpp 
b/src/zookeeper/zookeeper_session.cpp
index 3e264f4f6..5bc728d5a 100644
--- a/src/zookeeper/zookeeper_session.cpp
+++ b/src/zookeeper/zookeeper_session.cpp
@@ -341,7 +341,7 @@ int zookeeper_session::attach(void *callback_owner, const 
state_callback &cb)
     }
 
     _watchers.emplace_back();
-    _watchers.back().watcher_path = "";
+    _watchers.back().watcher_path = std::make_shared<std::string>();
     _watchers.back().callback_owner = callback_owner;
     _watchers.back().watcher_callback = cb;
 
@@ -366,15 +366,16 @@ void zookeeper_session::dispatch_event(int type, int 
zstate, const char *path)
 
         std::for_each(
             _watchers.begin(), _watchers.end(), [path, ret_code](const 
watcher_object &obj) {
-                if (obj.watcher_path == path)
+                if (*obj.watcher_path == path) {
                     obj.watcher_callback(ret_code);
+                }
             });
     }
     {
         if (ZOO_SESSION_EVENT != type) {
             utils::auto_write_lock l(_watcher_lock);
             _watchers.remove_if(
-                [path](const watcher_object &obj) { return obj.watcher_path == 
path; });
+                [path](const watcher_object &obj) { return *obj.watcher_path 
== path; });
         }
     }
 }
@@ -398,50 +399,56 @@ void zookeeper_session::visit(zoo_opcontext *ctx)
         _watchers.back().watcher_callback = 
std::move(ctx->_input._watcher_callback);
     };
 
-    // TODO: the read ops from zookeeper might get the staled data, need to fix
+    // TODO(clang-tidy): the read ops from zookeeper might get the staled 
data, need to fix
     int ec = ZOK;
     zoo_input &input = ctx->_input;
-    const char *path = input._path.c_str();
+
+    // A local variable is needed here to hold a reference to `_path` to 
prevent it from
+    // being freed in the thread executing the completion callback, which 
could otherwise
+    // lead to a dangling pointer issue.
+    const std::shared_ptr<std::string> path(input._path);
+
     switch (ctx->_optype) {
     case ZOO_CREATE:
         ec = zoo_acreate(_handle,
-                         path,
+                         path->c_str(),
                          input._value.data(),
-                         input._value.length(),
+                         static_cast<int>(input._value.length()),
                          &ZOO_OPEN_ACL_UNSAFE,
                          ctx->_input._flags,
                          global_string_completion,
-                         (const void *)ctx);
+                         ctx);
         break;
     case ZOO_DELETE:
-        ec = zoo_adelete(_handle, path, -1, global_void_completion, (const 
void *)ctx);
+        ec = zoo_adelete(_handle, path->c_str(), -1, global_void_completion, 
ctx);
         break;
     case ZOO_EXISTS:
-        if (1 == input._is_set_watch)
+        if (1 == input._is_set_watch) {
             add_watch_object();
-        ec = zoo_aexists(
-            _handle, path, input._is_set_watch, global_state_completion, 
(const void *)ctx);
+        }
+        ec = zoo_aexists(_handle, path->c_str(), input._is_set_watch, 
global_state_completion, ctx);
         break;
     case ZOO_GET:
-        if (1 == input._is_set_watch)
+        if (1 == input._is_set_watch) {
             add_watch_object();
-        ec =
-            zoo_aget(_handle, path, input._is_set_watch, 
global_data_completion, (const void *)ctx);
+        }
+        ec = zoo_aget(_handle, path->c_str(), input._is_set_watch, 
global_data_completion, ctx);
         break;
     case ZOO_SET:
         ec = zoo_aset(_handle,
-                      path,
+                      path->c_str(),
                       input._value.data(),
-                      input._value.length(),
+                      static_cast<int>(input._value.length()),
                       -1,
                       global_state_completion,
-                      (const void *)ctx);
+                      ctx);
         break;
     case ZOO_GETCHILDREN:
-        if (1 == input._is_set_watch)
+        if (1 == input._is_set_watch) {
             add_watch_object();
+        }
         ec = zoo_aget_children(
-            _handle, path, input._is_set_watch, global_strings_completion, 
(const void *)ctx);
+            _handle, path->c_str(), input._is_set_watch, 
global_strings_completion, ctx);
         break;
     case ZOO_TRANSACTION:
         ec = zoo_amulti(_handle,
@@ -449,7 +456,7 @@ void zookeeper_session::visit(zoo_opcontext *ctx)
                         input._pkt->_ops,
                         input._pkt->_results,
                         global_void_completion,
-                        (const void *)ctx);
+                        ctx);
         break;
     default:
         break;
@@ -490,7 +497,7 @@ void zookeeper_session::global_watcher(
 }
 
 #define COMPLETION_INIT(rc, data)                                              
                    \
-    zoo_opcontext *op_ctx = (zoo_opcontext *)data;                             
                    \
+    auto *op_ctx = static_cast<zoo_opcontext *>(const_cast<void *>(data));     
                    \
     op_ctx->_priv_session_ref->init_non_dsn_thread();                          
                    \
     zoo_output &output = op_ctx->_output;                                      
                    \
     output.error = rc
@@ -498,9 +505,10 @@ void zookeeper_session::global_watcher(
 void zookeeper_session::global_string_completion(int rc, const char *name, 
const void *data)
 {
     COMPLETION_INIT(rc, data);
-    LOG_DEBUG("rc({}), input path({})", zerror(rc), op_ctx->_input._path);
-    if (ZOK == rc && name != nullptr)
+    LOG_DEBUG("rc({}), input path({})", zerror(rc), *op_ctx->_input._path);
+    if (ZOK == rc && name != nullptr) {
         LOG_DEBUG("created path: {}", name);
+    }
     output.create_op._created_path = name;
     op_ctx->_callback_function(op_ctx);
     release_ref(op_ctx);
@@ -510,7 +518,7 @@ void zookeeper_session::global_data_completion(
     int rc, const char *value, int value_length, const Stat *, const void 
*data)
 {
     COMPLETION_INIT(rc, data);
-    LOG_DEBUG("rc({}), input path({})", zerror(rc), op_ctx->_input._path);
+    LOG_DEBUG("rc({}), input path({})", zerror(rc), *op_ctx->_input._path);
     output.get_op.value_length = value_length;
     output.get_op.value = value;
     op_ctx->_callback_function(op_ctx);
@@ -520,7 +528,7 @@ void zookeeper_session::global_data_completion(
 void zookeeper_session::global_state_completion(int rc, const Stat *stat, 
const void *data)
 {
     COMPLETION_INIT(rc, data);
-    LOG_DEBUG("rc({}), input path({})", zerror(rc), op_ctx->_input._path);
+    LOG_DEBUG("rc({}), input path({})", zerror(rc), *op_ctx->_input._path);
     if (op_ctx->_optype == ZOO_EXISTS) {
         output.exists_op._node_stat = stat;
         op_ctx->_callback_function(op_ctx);
@@ -536,9 +544,10 @@ void zookeeper_session::global_strings_completion(int rc,
                                                   const void *data)
 {
     COMPLETION_INIT(rc, data);
-    LOG_DEBUG("rc({}), input path({})", zerror(rc), op_ctx->_input._path);
-    if (rc == ZOK && strings != nullptr)
+    LOG_DEBUG("rc({}), input path({})", zerror(rc), *op_ctx->_input._path);
+    if (rc == ZOK && strings != nullptr) {
         LOG_DEBUG("child count: {}", strings->count);
+    }
     output.getchildren_op.strings = strings;
     op_ctx->_callback_function(op_ctx);
     release_ref(op_ctx);
@@ -547,12 +556,14 @@ void zookeeper_session::global_strings_completion(int rc,
 void zookeeper_session::global_void_completion(int rc, const void *data)
 {
     COMPLETION_INIT(rc, data);
-    if (op_ctx->_optype == ZOO_DELETE)
-        LOG_DEBUG("rc({}), input path({})", zerror(rc), op_ctx->_input._path);
-    else
+    if (op_ctx->_optype == ZOO_DELETE) {
+        LOG_DEBUG("rc({}), input path({})", zerror(rc), *op_ctx->_input._path);
+    } else {
         LOG_DEBUG("rc({})", zerror(rc));
+    }
     op_ctx->_callback_function(op_ctx);
     release_ref(op_ctx);
 }
+
 } // namespace dist
 } // namespace dsn
diff --git a/src/zookeeper/zookeeper_session.h 
b/src/zookeeper/zookeeper_session.h
index 5fcaa9f9a..2dd602cbe 100644
--- a/src/zookeeper/zookeeper_session.h
+++ b/src/zookeeper/zookeeper_session.h
@@ -82,20 +82,22 @@ public:
 
     struct zoo_input
     {
-        std::string _path;
+        std::shared_ptr<std::string> _path;
 
-        /* for create and set */
+        // For create and set.
         blob _value;
-        /* for create */
-        int _flags;
-        /* for get/exists/get_children */
-        int _is_set_watch;
 
-        /* for watcher callback */
-        void *_owner;
+        // For create.
+        int _flags{0};
+
+        // For get/exists/get_children.
+        int _is_set_watch{0};
+
+        // For watcher callback.
+        void *_owner{nullptr};
         std::function<void(int)> _watcher_callback;
 
-        /* for multi-op transaction */
+        // For multi-op transaction.
         std::shared_ptr<zoo_atomic_packet> _pkt;
     };
 
@@ -180,7 +182,7 @@ private:
 
     struct watcher_object
     {
-        std::string watcher_path;
+        std::shared_ptr<std::string> watcher_path;
         void *callback_owner;
         state_callback watcher_callback;
     };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to