This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 676a765f2f8744bd7273b7358ab3a8b22c148edb Author: Adar Dembo <[email protected]> AuthorDate: Sat Mar 28 01:46:12 2020 -0700 fs: remove kudu::Bind usage from DataDir closures Change-Id: Id58b8740ccc33762383a48680c726e8d30e7f25c Reviewed-on: http://gerrit.cloudera.org:8080/15581 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/fs/dir_manager.cc | 10 +++++----- src/kudu/fs/dir_manager.h | 4 ++-- src/kudu/fs/file_block_manager.cc | 12 ++++++------ src/kudu/fs/log_block_manager.cc | 39 ++++++++++++++++++++------------------- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc index c76f773..fd8a81f 100644 --- a/src/kudu/fs/dir_manager.cc +++ b/src/kudu/fs/dir_manager.cc @@ -34,7 +34,6 @@ #include "kudu/fs/dir_util.h" #include "kudu/fs/fs.pb.h" -#include "kudu/gutil/bind.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/join.h" @@ -103,12 +102,12 @@ void Dir::Shutdown() { is_shutdown_ = true; } -void Dir::ExecClosure(const Closure& task) { - Status s = pool_->Submit([task]() { task.Run(); }); +void Dir::ExecClosure(const std::function<void()>& task) { + Status s = pool_->Submit(task); if (!s.ok()) { WARN_NOT_OK( s, "Could not submit task to thread pool, running it synchronously"); - task.Run(); + task(); } } @@ -577,7 +576,8 @@ Status DirManager::Open() { // Use the per-dir thread pools to delete temporary files in parallel. for (const auto& dir : dirs) { if (dir->instance()->healthy()) { - dir->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dir->dir())); + auto* d = dir.get(); + dir->ExecClosure([this, d]() { DeleteTmpFilesRecursively(this->env_, d->dir()); }); } } for (const auto& dir : dirs) { diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h index e1beae0..d162ae8 100644 --- a/src/kudu/fs/dir_manager.h +++ b/src/kudu/fs/dir_manager.h @@ -19,6 +19,7 @@ #include <stdint.h> +#include <functional> #include <memory> #include <mutex> #include <set> @@ -26,7 +27,6 @@ #include <unordered_map> #include <vector> -#include "kudu/gutil/callback.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/util/locks.h" @@ -109,7 +109,7 @@ class Dir { // // Normally the task is performed asynchronously. However, if submission to // the pool fails, it runs synchronously on the current thread. - void ExecClosure(const Closure& task); + void ExecClosure(const std::function<void()>& task); // Waits for any outstanding closures submitted via ExecClosure() to finish. void WaitOnClosures(); diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc index ae8f1bc..2e631cb 100644 --- a/src/kudu/fs/file_block_manager.cc +++ b/src/kudu/fs/file_block_manager.cc @@ -37,7 +37,6 @@ #include "kudu/fs/dir_manager.h" #include "kudu/fs/error_manager.h" #include "kudu/fs/fs_report.h" -#include "kudu/gutil/bind.h" #include "kudu/gutil/casts.h" #include "kudu/gutil/integral_types.h" #include "kudu/gutil/map-util.h" @@ -943,11 +942,12 @@ Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) { vector<vector<BlockId>> block_id_vecs(dds.size()); vector<Status> statuses(dds.size()); for (int i = 0; i < dds.size(); i++) { - dds[i]->ExecClosure(Bind(&GetAllBlockIdsForDir, - env_, - dds[i].get(), - &block_id_vecs[i], - &statuses[i])); + auto* dd = dds[i].get(); + auto* bid_vec = &block_id_vecs[i]; + auto* s = &statuses[i]; + dds[i]->ExecClosure([this, dd, bid_vec, s]() { + GetAllBlockIdsForDir(this->env_, dd, bid_vec, s); + }); } for (const auto& dd : dd_manager_->dirs()) { dd->WaitOnClosures(); diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index cb40778..945d53f 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -22,6 +22,7 @@ #include <algorithm> #include <cstddef> #include <cstdint> +#include <functional> #include <map> #include <memory> #include <mutex> @@ -44,9 +45,6 @@ #include "kudu/fs/error_manager.h" #include "kudu/fs/fs.pb.h" #include "kudu/fs/fs_report.h" -#include "kudu/gutil/bind.h" -#include "kudu/gutil/bind_helpers.h" -#include "kudu/gutil/callback.h" #include "kudu/gutil/casts.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" @@ -502,7 +500,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> { // // Normally the task is performed asynchronously. However, if submission to // the pool fails, it runs synchronously on the current thread. - void ExecClosure(const Closure& task); + void ExecClosure(const std::function<void()>& task); // Produces a debug-friendly string representation of this container. string ToString() const; @@ -1337,7 +1335,7 @@ void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) { live_blocks_.IncrementBy(-1); } -void LogBlockContainer::ExecClosure(const Closure& task) { +void LogBlockContainer::ExecClosure(const std::function<void()>& task) { data_dir_->ExecClosure(task); } @@ -1496,11 +1494,11 @@ LogBlockDeletionTransaction::~LogBlockDeletionTransaction() { Substitute("could not coalesce hole punching for container: $0", container->ToString())); + scoped_refptr<LogBlockContainer> self(container); for (const auto& interval : entry.second) { - container->ExecClosure(Bind(&LogBlockContainer::ContainerDeletionAsync, - container, - interval.first, - interval.second - interval.first)); + container->ExecClosure([self, interval]() { + self->ContainerDeletionAsync(interval.first, interval.second - interval.first); + }); } } } @@ -2081,12 +2079,12 @@ Status LogBlockManager::Open(FsReport* report) { } // Open the data dir asynchronously. - dd->ExecClosure( - Bind(&LogBlockManager::OpenDataDir, - Unretained(this), - dd.get(), - &container_results[i], - &statuses[i])); + auto* dd_raw = dd.get(); + auto* results = &container_results[i]; + auto* s = &statuses[i]; + dd->ExecClosure([this, dd_raw, results, s]() { + this->OpenDataDir(dd_raw, results, s); + }); } // Wait for the opens to complete. @@ -2136,8 +2134,9 @@ Status LogBlockManager::Open(FsReport* report) { } if (do_repair) { dir_results[i] = std::move(dir_result); - dd->ExecClosure(Bind(&LogBlockManager::RepairTask, Unretained(this), - dd.get(), Unretained(dir_results[i].get()))); + auto* dd_raw = dd.get(); + auto* dr = dir_results[i].get(); + dd->ExecClosure([this, dd_raw, dr]() { this->RepairTask(dd_raw, dr); }); } } @@ -2558,8 +2557,10 @@ void LogBlockManager::OpenDataDir( } // Load the container's records asynchronously. - dir->ExecClosure(Bind(&LogBlockManager::LoadContainer, Unretained(this), - dir, container, Unretained(results->back().get()))); + auto* r = results->back().get(); + dir->ExecClosure([this, dir, container, r]() { + this->LoadContainer(dir, container, r); + }); } }
