This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 56ce1ad KUDU-1728 parallelize download blocks in tablet-copy-client
56ce1ad is described below
commit 56ce1ad8bda24dbaed36c0f059e13a3d8e25b1d0
Author: ningw <[email protected]>
AuthorDate: Mon Aug 3 11:20:56 2020 +0800
KUDU-1728 parallelize download blocks in tablet-copy-client
Parallelize the action of 'Download blocks' in tablet-copy-client.
Previsouly downloading blocks from tablet server is executed sequentially,
thus actions which related to 'DownloadBlocks' like
'recover from other tserver', 'cluster rebalance' may be slow sometimes.
And sometimes downloading the blocks is slow when only one thread is used
while bandwidth isn't the bottleneck.
This commit introduce FLAGS_num_threads_blocks_download to control the
number of threads for download blocks within a tablet-copy-client.
Here I attached the simple benchmark, the result are averaged by 8 epoch
result. Metrics of result is seconds with accuracy of 2 decimal points.
The experiment is download all data from remote tserver to local tserver.
Settings:
Remote machine: 8 cores 32g tserver limited to 8g, tserver version 1.10
Local machine: 8 cores 12g memory
Tserver has 3 x 500M tablets + 3 x 430M tablets + 3 x 420M tablets and
259 x 8M tablets.
All tablets were compacted well with diskrowset height 1.0.
Here I use two variable to controll the experiment.
numbers of tablet-copy-client thread, write as tc.
numbers of blocks-download thread within each tablet-copy-client thread,
(FLAGS_num_threads_blocks_download), write as bd.
The result 37.58 correspond to column 'tc-2', row 'bd-4' can be explained
as,
It takes 37.58s to download all data from tserver with 2
tablet-copy-client threads and each tablet-copy-client thread has 4 threads
to
download blocks.
All result in column tc-1 refer to behave before this patch.
time tc-1 tc-2 tc-4 tc-8
bd-1 76.91 48.50 34.27 32.39
bd-2 54.45 43.26 33.27 32.37
bd-4 48.36 37.58 33.31 32.30
bd-8 47.95 38.36 33.98 32.60
Change-Id: Id83abca7a38cf183d9c27d82bb8a022699079e0e
Reviewed-on: http://gerrit.cloudera.org:8080/16274
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tserver/tablet_copy_client-test.cc | 16 ++-
src/kudu/tserver/tablet_copy_client.cc | 195 +++++++++++++++++++---------
src/kudu/tserver/tablet_copy_client.h | 35 ++++-
3 files changed, 183 insertions(+), 63 deletions(-)
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc
b/src/kudu/tserver/tablet_copy_client-test.cc
index 2f10c1b..10b3470 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -75,6 +75,7 @@ using std::thread;
using std::vector;
DECLARE_double(env_inject_eio);
+DECLARE_int32(tablet_copy_download_threads_nums_per_session);
DECLARE_string(block_manager);
DECLARE_string(env_inject_eio_globs);
@@ -212,6 +213,19 @@ TEST_F(TabletCopyClientTest, TestLifeCycle) {
ASSERT_FALSE(meta_);
}
+ // Rowset are download by multithreads, the error status can be corretly
+ // collected among all threads.
+ {
+ google::FlagSaver fs;
+ FLAGS_env_inject_eio = 0.5;
+ FLAGS_tablet_copy_download_threads_nums_per_session = 16;
+ s = StartCopy();
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Failed to write tablet metadata");
+ ASSERT_EQ(TabletCopyClient::State::kInitialized, client_->state_);
+ ASSERT_FALSE(meta_);
+ }
+
// Now let's try replacing a tablet. Set a metadata that we can replace.
ASSERT_OK(ResetTabletCopyClient());
ASSERT_OK(StartCopy());
@@ -360,7 +374,7 @@ TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
// test is to exemplify the difference in syncs between the log and file
// block managers, but it would be nice to formulate a bound here.
if (FLAGS_block_manager == "log") {
- ASSERT_GE(9, down_cast<Counter*>(
+ ASSERT_GE(15, down_cast<Counter*>(
metric_entity_->FindOrNull(METRIC_block_manager_total_disk_sync).get())->value());
} else {
ASSERT_GE(22, down_cast<Counter*>(
diff --git a/src/kudu/tserver/tablet_copy_client.cc
b/src/kudu/tserver/tablet_copy_client.cc
index d783565..6a16794 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -18,7 +18,9 @@
#include "kudu/tserver/tablet_copy_client.h"
#include <cstdint>
+#include <functional>
#include <memory>
+#include <mutex>
#include <ostream>
#include <utility>
@@ -68,6 +70,7 @@
#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
DEFINE_int32(tablet_copy_begin_session_timeout_ms, 30000,
@@ -100,6 +103,11 @@ DEFINE_double(tablet_copy_fault_crash_before_write_cmeta,
0.0,
TAG_FLAG(tablet_copy_fault_crash_before_write_cmeta, unsafe);
TAG_FLAG(tablet_copy_fault_crash_before_write_cmeta, runtime);
+DEFINE_int32(tablet_copy_download_threads_nums_per_session, 4,
+ "Number of threads per tablet copy session for downloading tablet
data blocks.");
+DEFINE_validator(tablet_copy_download_threads_nums_per_session,
+ [](const char* /*n*/, int32_t v) { return v > 0; });
+
DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
METRIC_DEFINE_counter(server, tablet_copy_bytes_fetched,
@@ -130,10 +138,10 @@ using fs::BlockManager;
using fs::CreateBlockOptions;
using fs::WritableBlock;
using rpc::Messenger;
+using std::atomic;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
-using std::vector;
using strings::Substitute;
using tablet::ColumnDataPB;
using tablet::DeltaDataPB;
@@ -170,6 +178,10 @@ TabletCopyClient::TabletCopyClient(
if (tablet_copy_metrics_) {
tablet_copy_metrics_->open_client_sessions->Increment();
}
+ ThreadPoolBuilder("blocks-download-pool-" + tablet_id)
+ .set_max_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
+ .set_min_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
+ .Build(&blocks_download_pool_);
}
TabletCopyClient::~TabletCopyClient() {
@@ -178,6 +190,7 @@ TabletCopyClient::~TabletCopyClient() {
LogPrefix()));
WARN_NOT_OK(Abort(), Substitute("$0Failed to fully clean up tablet after
aborted copy",
LogPrefix()));
+ blocks_download_pool_->Shutdown();
if (tablet_copy_metrics_) {
tablet_copy_metrics_->open_client_sessions->IncrementBy(-1);
}
@@ -400,7 +413,9 @@ Status TabletCopyClient::FetchAll(const
scoped_refptr<TabletReplica>& tablet_rep
tablet_replica_ = tablet_replica;
- // Download all the files (serially, for now, but in parallel in the future).
+ // Download all the tablet's files.
+ // Data blocks are downloaded in parallel, where the concurrency is
+ // controlled by the --tablet_copy_download_threads_num_per_session flag.
RETURN_NOT_OK(DownloadBlocks());
RETURN_NOT_OK(DownloadWALs());
@@ -572,72 +587,109 @@ int TabletCopyClient::CountRemoteBlocks() const {
return num_blocks;
}
+void TabletCopyClient::DownloadRowset(const RowSetDataPB& src_rowset,
+ int num_remote_blocks,
+ atomic<int>* block_count,
+ Status* end_status) {
+ RowSetDataPB* dst_rowset;
+ {
+ std::lock_guard<simple_spinlock> l(simple_lock_);
+ if (!end_status->ok()) {
+ return;
+ }
+ // Create rowset.
+ dst_rowset = superblock_->add_rowsets();
+ }
+ *dst_rowset = src_rowset;
+ // Clear the data in the rowset so that we don't end up deleting the wrong
+ // blocks (using the ids of the remote blocks) if we fail.
+ // TODO(mpercy): This is pretty fragile. Consider building a class
+ // structure on top of SuperBlockPB to abstract copying details.
+ dst_rowset->clear_columns();
+ dst_rowset->clear_redo_deltas();
+ dst_rowset->clear_undo_deltas();
+ dst_rowset->clear_bloom_block();
+ dst_rowset->clear_adhoc_index_block();
+
+ // We can't leave superblock_ unserializable with unset required field
+ // values in child elements, so we must download and rewrite each block
+ // before referencing it in the rowset.
+ Status s;
+ for (const ColumnDataPB& src_col : src_rowset.columns()) {
+ BlockIdPB new_block_id;
+ s = DownloadAndRewriteBlockIfEndStatusOK(src_col.block(),
num_remote_blocks,
+ block_count, &new_block_id,
end_status);
+ if (!s.ok()) {
+ return;
+ }
+ ColumnDataPB* dst_col = dst_rowset->add_columns();
+ *dst_col = src_col;
+ *dst_col->mutable_block() = new_block_id;
+ }
+ for (const DeltaDataPB& src_redo : src_rowset.redo_deltas()) {
+ BlockIdPB new_block_id;
+ s = DownloadAndRewriteBlockIfEndStatusOK(src_redo.block(),
num_remote_blocks,
+ block_count, &new_block_id,
end_status);
+ if (!s.ok()) {
+ return;
+ }
+ DeltaDataPB* dst_redo = dst_rowset->add_redo_deltas();
+ *dst_redo = src_redo;
+ *dst_redo->mutable_block() = new_block_id;
+ }
+ for (const DeltaDataPB& src_undo : src_rowset.undo_deltas()) {
+ BlockIdPB new_block_id;
+ s = DownloadAndRewriteBlockIfEndStatusOK(src_undo.block(),
num_remote_blocks,
+ block_count, &new_block_id,
end_status);
+ if (!s.ok()) {
+ return;
+ }
+ DeltaDataPB* dst_undo = dst_rowset->add_undo_deltas();
+ *dst_undo = src_undo;
+ *dst_undo->mutable_block() = new_block_id;
+ }
+ if (src_rowset.has_bloom_block()) {
+ BlockIdPB new_block_id;
+ s = DownloadAndRewriteBlockIfEndStatusOK(src_rowset.bloom_block(),
num_remote_blocks,
+ block_count, &new_block_id,
end_status);
+ if (!s.ok()) {
+ return;
+ }
+ *dst_rowset->mutable_bloom_block() = new_block_id;
+ }
+ if (src_rowset.has_adhoc_index_block()) {
+ BlockIdPB new_block_id;
+ s = DownloadAndRewriteBlockIfEndStatusOK(src_rowset.adhoc_index_block(),
num_remote_blocks,
+ block_count, &new_block_id,
end_status);
+ if (!s.ok()) {
+ return;
+ }
+ *dst_rowset->mutable_adhoc_index_block() = new_block_id;
+ }
+}
+
Status TabletCopyClient::DownloadBlocks() {
CHECK_EQ(kStarted, state_);
// Count up the total number of blocks to download.
- int num_remote_blocks = CountRemoteBlocks();
+ const int num_remote_blocks = CountRemoteBlocks();
// Download each block, writing the new block IDs into the new superblock
// as each block downloads.
- int block_count = 0;
+ atomic<int32_t> block_count(0);
LOG_WITH_PREFIX(INFO) << "Starting download of " << num_remote_blocks << "
data blocks...";
+
+ Status end_status = Status::OK();
+ // Download rowsets in parallel.
+ // Each task downloads the blocks of the corresponding rowset sequentially.
for (const RowSetDataPB& src_rowset : remote_superblock_->rowsets()) {
- // Create rowset.
- RowSetDataPB* dst_rowset = superblock_->add_rowsets();
- *dst_rowset = src_rowset;
- // Clear the data in the rowset so that we don't end up deleting the wrong
- // blocks (using the ids of the remote blocks) if we fail.
- // TODO(mpercy): This is pretty fragile. Consider building a class
- // structure on top of SuperBlockPB to abstract copying details.
- dst_rowset->clear_columns();
- dst_rowset->clear_redo_deltas();
- dst_rowset->clear_undo_deltas();
- dst_rowset->clear_bloom_block();
- dst_rowset->clear_adhoc_index_block();
-
- // We can't leave superblock_ unserializable with unset required field
- // values in child elements, so we must download and rewrite each block
- // before referencing it in the rowset.
- for (const ColumnDataPB& src_col : src_rowset.columns()) {
- BlockIdPB new_block_id;
- RETURN_NOT_OK(DownloadAndRewriteBlock(src_col.block(), num_remote_blocks,
- &block_count, &new_block_id));
- ColumnDataPB* dst_col = dst_rowset->add_columns();
- *dst_col = src_col;
- *dst_col->mutable_block() = new_block_id;
- }
- for (const DeltaDataPB& src_redo : src_rowset.redo_deltas()) {
- BlockIdPB new_block_id;
- RETURN_NOT_OK(DownloadAndRewriteBlock(src_redo.block(),
num_remote_blocks,
- &block_count, &new_block_id));
- DeltaDataPB* dst_redo = dst_rowset->add_redo_deltas();
- *dst_redo = src_redo;
- *dst_redo->mutable_block() = new_block_id;
- }
- for (const DeltaDataPB& src_undo : src_rowset.undo_deltas()) {
- BlockIdPB new_block_id;
- RETURN_NOT_OK(DownloadAndRewriteBlock(src_undo.block(),
num_remote_blocks,
- &block_count, &new_block_id));
- DeltaDataPB* dst_undo = dst_rowset->add_undo_deltas();
- *dst_undo = src_undo;
- *dst_undo->mutable_block() = new_block_id;
- }
- if (src_rowset.has_bloom_block()) {
- BlockIdPB new_block_id;
- RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.bloom_block(),
num_remote_blocks,
- &block_count, &new_block_id));
- *dst_rowset->mutable_bloom_block() = new_block_id;
- }
- if (src_rowset.has_adhoc_index_block()) {
- BlockIdPB new_block_id;
- RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.adhoc_index_block(),
num_remote_blocks,
- &block_count, &new_block_id));
- *dst_rowset->mutable_adhoc_index_block() = new_block_id;
- }
+ RETURN_NOT_OK(blocks_download_pool_->Submit([&]() mutable {
+ DownloadRowset(src_rowset, num_remote_blocks, &block_count, &end_status);
+ }));
}
+ blocks_download_pool_->Wait();
- return Status::OK();
+ return end_status;
}
Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
@@ -688,12 +740,12 @@ Status TabletCopyClient::WriteConsensusMetadata() {
Status TabletCopyClient::DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
int num_blocks,
- int* block_count,
+ atomic<int32_t>* block_count,
BlockIdPB* dest_block_id) {
BlockId old_block_id(BlockId::FromPB(src_block_id));
SetStatusMessage(Substitute("Downloading block $0 ($1/$2)",
old_block_id.ToString(),
- *block_count + 1, num_blocks));
+ block_count->load() + 1 , num_blocks));
BlockId new_block_id;
RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id),
"Unable to download block with id " + old_block_id.ToString());
@@ -703,12 +755,32 @@ Status TabletCopyClient::DownloadAndRewriteBlock(const
BlockIdPB& src_block_id,
return Status::OK();
}
+Status TabletCopyClient::DownloadAndRewriteBlockIfEndStatusOK(const BlockIdPB&
src_block_id,
+ int num_blocks,
+ atomic<int32_t>*
block_count,
+ BlockIdPB*
dest_block_id,
+ Status*
end_status) {
+ {
+ std::lock_guard<simple_spinlock> l(simple_lock_);
+ RETURN_NOT_OK(*end_status);
+ }
+ Status s = DownloadAndRewriteBlock(src_block_id, num_blocks, block_count,
dest_block_id);
+ if (!s.ok()) {
+ std::lock_guard<simple_spinlock> l(simple_lock_);
+ if (!s.ok() && end_status->ok()) {
+ *end_status = s;
+ }
+ }
+ return s;
+}
+
Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
BlockId* new_block_id) {
VLOG_WITH_PREFIX(1) << "Downloading block with block_id " <<
old_block_id.ToString();
RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(), "Not downloading block for
replica");
unique_ptr<WritableBlock> block;
+ // log_block_manager uses a lock to guarantee the block_id is unique.
RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(CreateBlockOptions({
tablet_id_ }), &block),
"Unable to create new block");
@@ -721,7 +793,10 @@ Status TabletCopyClient::DownloadBlock(const BlockId&
old_block_id,
*new_block_id = block->id();
RETURN_NOT_OK_PREPEND(block->Finalize(), "Unable to finalize block");
- transaction_->AddCreatedBlock(std::move(block));
+ {
+ std::lock_guard<simple_spinlock> l(simple_lock_);
+ transaction_->AddCreatedBlock(std::move(block));
+ }
return Status::OK();
}
diff --git a/src/kudu/tserver/tablet_copy_client.h
b/src/kudu/tserver/tablet_copy_client.h
index 27ed996..52d91ba 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
@@ -25,6 +26,7 @@
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
@@ -35,6 +37,7 @@ class BlockId;
class BlockIdPB;
class FsManager;
class HostPort;
+class ThreadPool;
namespace consensus {
class ConsensusMetadata;
@@ -52,14 +55,15 @@ class RpcController;
} // namespace rpc
namespace tablet {
+class RowSetDataPB;
class TabletMetadata;
class TabletReplica;
class TabletSuperBlockPB;
} // namespace tablet
namespace tserver {
-class DataIdPB;
class DataChunkPB;
+class DataIdPB;
class TabletCopyServiceProxy;
// Server-wide tablet copy metrics.
@@ -193,6 +197,15 @@ class TabletCopyClient {
// Count the number of blocks on the remote (from 'remote_superblock_').
int CountRemoteBlocks() const;
+ // A task do download blocks of the specified rowset.
+ // In case of a failure, a thread notifies others via 'end_status',
+ // so they can abort downloading of their rowsets' blocks.
+ // If all threads succeed, 'end_status' is set to Status::OK().
+ void DownloadRowset(const tablet::RowSetDataPB& src_rowset,
+ int num_remote_blocks,
+ std::atomic<int>* block_count,
+ Status* end_status);
+
// Download all blocks belonging to a tablet sequentially. Add all
// downloaded blocks to the tablet copy's transaction.
//
@@ -210,9 +223,20 @@ class TabletCopyClient {
// - 'block_count' is incremented by 1.
Status DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
int num_blocks,
- int* block_count,
+ std::atomic<int32_t>* block_count,
BlockIdPB* dest_block_id);
+ // Download and rewrite remote block if end_status it hasn't been set as
failed.
+ // This method calls DownloadAndRewriteBlock and return its' status.
+ // This method is thread-safe.
+ //
+ // On failure, end_status is set as error status of DownloadAndRewriteBlock.
+ Status DownloadAndRewriteBlockIfEndStatusOK(const BlockIdPB& src_block_id,
+ int num_blocks,
+ std::atomic<int32_t>*
block_count,
+ BlockIdPB* dest_block_id,
+ Status* end_status);
+
// Download a single block.
// Data block is opened with new ID. After downloading, the block is
finalized
// and added to the tablet copy's transaction.
@@ -278,6 +302,13 @@ class TabletCopyClient {
// Block transaction for the tablet copy.
std::unique_ptr<fs::BlockCreationTransaction> transaction_;
+ // Thread pool for downloading all data blocks in parallel.
+ std::unique_ptr<ThreadPool> blocks_download_pool_;
+
+ // Protects adding/creating blocks, adding a rowset,
+ // reading/updating rowset download status.
+ simple_spinlock simple_lock_;
+
DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
};