This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 56ce1ad  KUDU-1728 parallelize download blocks in tablet-copy-client
56ce1ad is described below

commit 56ce1ad8bda24dbaed36c0f059e13a3d8e25b1d0
Author: ningw <[email protected]>
AuthorDate: Mon Aug 3 11:20:56 2020 +0800

    KUDU-1728 parallelize download blocks in tablet-copy-client
    
    Parallelize the action of 'Download blocks' in tablet-copy-client.
    
    Previsouly downloading blocks from tablet server is executed sequentially,
    thus actions which related to 'DownloadBlocks' like
    'recover from other tserver', 'cluster rebalance' may be slow sometimes.
    And sometimes downloading the blocks is slow when only one thread is used
    while bandwidth isn't the bottleneck.
    
    This commit introduce FLAGS_num_threads_blocks_download to control the
    number of threads for download blocks within a tablet-copy-client.
    
    Here I attached the simple benchmark, the result are averaged by 8 epoch
    result. Metrics of result is seconds with accuracy of 2 decimal points.
    
    The experiment is download all data from remote tserver to local tserver.
    
    Settings:
    Remote machine: 8 cores 32g tserver limited to 8g, tserver version 1.10
    Local machine: 8 cores 12g memory
    Tserver has 3 x 500M tablets + 3 x 430M tablets + 3 x 420M tablets and
    259 x 8M tablets.
    All tablets were compacted well with diskrowset height 1.0.
    
    Here I use two variable to controll the experiment.
    numbers of tablet-copy-client thread, write as tc.
    numbers of blocks-download thread within each tablet-copy-client thread,
    (FLAGS_num_threads_blocks_download), write as bd.
    
    The result 37.58 correspond to column 'tc-2', row 'bd-4' can be explained 
as,
    It takes 37.58s to download all data from tserver with 2
    tablet-copy-client threads and each tablet-copy-client thread has 4 threads 
to
    download blocks.
    
    All result in column tc-1 refer to behave before this patch.
    
    time     tc-1      tc-2      tc-4      tc-8
    
    bd-1     76.91     48.50     34.27     32.39
    bd-2     54.45     43.26     33.27     32.37
    bd-4     48.36     37.58     33.31     32.30
    bd-8     47.95     38.36     33.98     32.60
    
    Change-Id: Id83abca7a38cf183d9c27d82bb8a022699079e0e
    Reviewed-on: http://gerrit.cloudera.org:8080/16274
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/tserver/tablet_copy_client-test.cc |  16 ++-
 src/kudu/tserver/tablet_copy_client.cc      | 195 +++++++++++++++++++---------
 src/kudu/tserver/tablet_copy_client.h       |  35 ++++-
 3 files changed, 183 insertions(+), 63 deletions(-)

diff --git a/src/kudu/tserver/tablet_copy_client-test.cc 
b/src/kudu/tserver/tablet_copy_client-test.cc
index 2f10c1b..10b3470 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -75,6 +75,7 @@ using std::thread;
 using std::vector;
 
 DECLARE_double(env_inject_eio);
+DECLARE_int32(tablet_copy_download_threads_nums_per_session);
 DECLARE_string(block_manager);
 DECLARE_string(env_inject_eio_globs);
 
@@ -212,6 +213,19 @@ TEST_F(TabletCopyClientTest, TestLifeCycle) {
     ASSERT_FALSE(meta_);
   }
 
+  // Rowset are download by multithreads, the error status can be corretly
+  // collected among all threads.
+  {
+    google::FlagSaver fs;
+    FLAGS_env_inject_eio = 0.5;
+    FLAGS_tablet_copy_download_threads_nums_per_session = 16;
+    s = StartCopy();
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Failed to write tablet metadata");
+    ASSERT_EQ(TabletCopyClient::State::kInitialized, client_->state_);
+    ASSERT_FALSE(meta_);
+  }
+
   // Now let's try replacing a tablet. Set a metadata that we can replace.
   ASSERT_OK(ResetTabletCopyClient());
   ASSERT_OK(StartCopy());
@@ -360,7 +374,7 @@ TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
   // test is to exemplify the difference in syncs between the log and file
   // block managers, but it would be nice to formulate a bound here.
   if (FLAGS_block_manager == "log") {
-    ASSERT_GE(9, down_cast<Counter*>(
+    ASSERT_GE(15, down_cast<Counter*>(
         
metric_entity_->FindOrNull(METRIC_block_manager_total_disk_sync).get())->value());
   } else {
     ASSERT_GE(22, down_cast<Counter*>(
diff --git a/src/kudu/tserver/tablet_copy_client.cc 
b/src/kudu/tserver/tablet_copy_client.cc
index d783565..6a16794 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -18,7 +18,9 @@
 #include "kudu/tserver/tablet_copy_client.h"
 
 #include <cstdint>
+#include <functional>
 #include <memory>
+#include <mutex>
 #include <ostream>
 #include <utility>
 
@@ -68,6 +70,7 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
 DEFINE_int32(tablet_copy_begin_session_timeout_ms, 30000,
@@ -100,6 +103,11 @@ DEFINE_double(tablet_copy_fault_crash_before_write_cmeta, 
0.0,
 TAG_FLAG(tablet_copy_fault_crash_before_write_cmeta, unsafe);
 TAG_FLAG(tablet_copy_fault_crash_before_write_cmeta, runtime);
 
+DEFINE_int32(tablet_copy_download_threads_nums_per_session, 4,
+             "Number of threads per tablet copy session for downloading tablet 
data blocks.");
+DEFINE_validator(tablet_copy_download_threads_nums_per_session,
+     [](const char* /*n*/, int32_t v) { return v > 0; });
+
 DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
 
 METRIC_DEFINE_counter(server, tablet_copy_bytes_fetched,
@@ -130,10 +138,10 @@ using fs::BlockManager;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
 using rpc::Messenger;
+using std::atomic;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
-using std::vector;
 using strings::Substitute;
 using tablet::ColumnDataPB;
 using tablet::DeltaDataPB;
@@ -170,6 +178,10 @@ TabletCopyClient::TabletCopyClient(
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_client_sessions->Increment();
   }
+  ThreadPoolBuilder("blocks-download-pool-" + tablet_id)
+    .set_max_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
+    .set_min_threads(FLAGS_tablet_copy_download_threads_nums_per_session)
+    .Build(&blocks_download_pool_);
 }
 
 TabletCopyClient::~TabletCopyClient() {
@@ -178,6 +190,7 @@ TabletCopyClient::~TabletCopyClient() {
                                              LogPrefix()));
   WARN_NOT_OK(Abort(), Substitute("$0Failed to fully clean up tablet after 
aborted copy",
                                   LogPrefix()));
+  blocks_download_pool_->Shutdown();
   if (tablet_copy_metrics_) {
     tablet_copy_metrics_->open_client_sessions->IncrementBy(-1);
   }
@@ -400,7 +413,9 @@ Status TabletCopyClient::FetchAll(const 
scoped_refptr<TabletReplica>& tablet_rep
 
   tablet_replica_ = tablet_replica;
 
-  // Download all the files (serially, for now, but in parallel in the future).
+  // Download all the tablet's files.
+  // Data blocks are downloaded in parallel, where the concurrency is
+  // controlled by the --tablet_copy_download_threads_num_per_session flag.
   RETURN_NOT_OK(DownloadBlocks());
   RETURN_NOT_OK(DownloadWALs());
 
@@ -572,72 +587,109 @@ int TabletCopyClient::CountRemoteBlocks() const {
   return num_blocks;
 }
 
+void TabletCopyClient::DownloadRowset(const RowSetDataPB& src_rowset,
+                                      int num_remote_blocks,
+                                      atomic<int>* block_count,
+                                      Status* end_status) {
+  RowSetDataPB* dst_rowset;
+  {
+    std::lock_guard<simple_spinlock> l(simple_lock_);
+    if (!end_status->ok()) {
+      return;
+    }
+    // Create rowset.
+    dst_rowset = superblock_->add_rowsets();
+  }
+  *dst_rowset = src_rowset;
+  // Clear the data in the rowset so that we don't end up deleting the wrong
+  // blocks (using the ids of the remote blocks) if we fail.
+  // TODO(mpercy): This is pretty fragile. Consider building a class
+  // structure on top of SuperBlockPB to abstract copying details.
+  dst_rowset->clear_columns();
+  dst_rowset->clear_redo_deltas();
+  dst_rowset->clear_undo_deltas();
+  dst_rowset->clear_bloom_block();
+  dst_rowset->clear_adhoc_index_block();
+
+  // We can't leave superblock_ unserializable with unset required field
+  // values in child elements, so we must download and rewrite each block
+  // before referencing it in the rowset.
+  Status s;
+  for (const ColumnDataPB& src_col : src_rowset.columns()) {
+    BlockIdPB new_block_id;
+    s = DownloadAndRewriteBlockIfEndStatusOK(src_col.block(), 
num_remote_blocks,
+                                             block_count, &new_block_id, 
end_status);
+    if (!s.ok()) {
+      return;
+    }
+    ColumnDataPB* dst_col = dst_rowset->add_columns();
+    *dst_col = src_col;
+    *dst_col->mutable_block() = new_block_id;
+  }
+  for (const DeltaDataPB& src_redo : src_rowset.redo_deltas()) {
+    BlockIdPB new_block_id;
+    s = DownloadAndRewriteBlockIfEndStatusOK(src_redo.block(), 
num_remote_blocks,
+                                             block_count, &new_block_id, 
end_status);
+    if (!s.ok()) {
+      return;
+    }
+    DeltaDataPB* dst_redo = dst_rowset->add_redo_deltas();
+    *dst_redo = src_redo;
+    *dst_redo->mutable_block() = new_block_id;
+  }
+  for (const DeltaDataPB& src_undo : src_rowset.undo_deltas()) {
+    BlockIdPB new_block_id;
+    s = DownloadAndRewriteBlockIfEndStatusOK(src_undo.block(), 
num_remote_blocks,
+                                             block_count, &new_block_id, 
end_status);
+    if (!s.ok()) {
+      return;
+    }
+    DeltaDataPB* dst_undo = dst_rowset->add_undo_deltas();
+    *dst_undo = src_undo;
+    *dst_undo->mutable_block() = new_block_id;
+  }
+  if (src_rowset.has_bloom_block()) {
+    BlockIdPB new_block_id;
+    s = DownloadAndRewriteBlockIfEndStatusOK(src_rowset.bloom_block(), 
num_remote_blocks,
+                                             block_count, &new_block_id, 
end_status);
+    if (!s.ok()) {
+      return;
+    }
+    *dst_rowset->mutable_bloom_block() = new_block_id;
+  }
+  if (src_rowset.has_adhoc_index_block()) {
+    BlockIdPB new_block_id;
+    s = DownloadAndRewriteBlockIfEndStatusOK(src_rowset.adhoc_index_block(), 
num_remote_blocks,
+                                             block_count, &new_block_id, 
end_status);
+    if (!s.ok()) {
+      return;
+    }
+    *dst_rowset->mutable_adhoc_index_block() = new_block_id;
+  }
+}
+
 Status TabletCopyClient::DownloadBlocks() {
   CHECK_EQ(kStarted, state_);
 
   // Count up the total number of blocks to download.
-  int num_remote_blocks = CountRemoteBlocks();
+  const int num_remote_blocks = CountRemoteBlocks();
 
   // Download each block, writing the new block IDs into the new superblock
   // as each block downloads.
-  int block_count = 0;
+  atomic<int32_t> block_count(0);
   LOG_WITH_PREFIX(INFO) << "Starting download of " << num_remote_blocks << " 
data blocks...";
+
+  Status end_status = Status::OK();
+  // Download rowsets in parallel.
+  // Each task downloads the blocks of the corresponding rowset sequentially.
   for (const RowSetDataPB& src_rowset : remote_superblock_->rowsets()) {
-    // Create rowset.
-    RowSetDataPB* dst_rowset = superblock_->add_rowsets();
-    *dst_rowset = src_rowset;
-    // Clear the data in the rowset so that we don't end up deleting the wrong
-    // blocks (using the ids of the remote blocks) if we fail.
-    // TODO(mpercy): This is pretty fragile. Consider building a class
-    // structure on top of SuperBlockPB to abstract copying details.
-    dst_rowset->clear_columns();
-    dst_rowset->clear_redo_deltas();
-    dst_rowset->clear_undo_deltas();
-    dst_rowset->clear_bloom_block();
-    dst_rowset->clear_adhoc_index_block();
-
-    // We can't leave superblock_ unserializable with unset required field
-    // values in child elements, so we must download and rewrite each block
-    // before referencing it in the rowset.
-    for (const ColumnDataPB& src_col : src_rowset.columns()) {
-      BlockIdPB new_block_id;
-      RETURN_NOT_OK(DownloadAndRewriteBlock(src_col.block(), num_remote_blocks,
-                                            &block_count, &new_block_id));
-      ColumnDataPB* dst_col = dst_rowset->add_columns();
-      *dst_col = src_col;
-      *dst_col->mutable_block() = new_block_id;
-    }
-    for (const DeltaDataPB& src_redo : src_rowset.redo_deltas()) {
-      BlockIdPB new_block_id;
-      RETURN_NOT_OK(DownloadAndRewriteBlock(src_redo.block(), 
num_remote_blocks,
-                                            &block_count, &new_block_id));
-      DeltaDataPB* dst_redo = dst_rowset->add_redo_deltas();
-      *dst_redo = src_redo;
-      *dst_redo->mutable_block() = new_block_id;
-    }
-    for (const DeltaDataPB& src_undo : src_rowset.undo_deltas()) {
-      BlockIdPB new_block_id;
-      RETURN_NOT_OK(DownloadAndRewriteBlock(src_undo.block(), 
num_remote_blocks,
-                                            &block_count, &new_block_id));
-      DeltaDataPB* dst_undo = dst_rowset->add_undo_deltas();
-      *dst_undo = src_undo;
-      *dst_undo->mutable_block() = new_block_id;
-    }
-    if (src_rowset.has_bloom_block()) {
-      BlockIdPB new_block_id;
-      RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.bloom_block(), 
num_remote_blocks,
-                                            &block_count, &new_block_id));
-      *dst_rowset->mutable_bloom_block() = new_block_id;
-    }
-    if (src_rowset.has_adhoc_index_block()) {
-      BlockIdPB new_block_id;
-      RETURN_NOT_OK(DownloadAndRewriteBlock(src_rowset.adhoc_index_block(), 
num_remote_blocks,
-                                            &block_count, &new_block_id));
-      *dst_rowset->mutable_adhoc_index_block() = new_block_id;
-    }
+    RETURN_NOT_OK(blocks_download_pool_->Submit([&]() mutable {
+      DownloadRowset(src_rowset, num_remote_blocks, &block_count, &end_status);
+    }));
   }
+  blocks_download_pool_->Wait();
 
-  return Status::OK();
+  return end_status;
 }
 
 Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
@@ -688,12 +740,12 @@ Status TabletCopyClient::WriteConsensusMetadata() {
 
 Status TabletCopyClient::DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
                                                  int num_blocks,
-                                                 int* block_count,
+                                                 atomic<int32_t>* block_count,
                                                  BlockIdPB* dest_block_id) {
   BlockId old_block_id(BlockId::FromPB(src_block_id));
   SetStatusMessage(Substitute("Downloading block $0 ($1/$2)",
                               old_block_id.ToString(),
-                              *block_count + 1, num_blocks));
+                              block_count->load() + 1 , num_blocks));
   BlockId new_block_id;
   RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id),
       "Unable to download block with id " + old_block_id.ToString());
@@ -703,12 +755,32 @@ Status TabletCopyClient::DownloadAndRewriteBlock(const 
BlockIdPB& src_block_id,
   return Status::OK();
 }
 
+Status TabletCopyClient::DownloadAndRewriteBlockIfEndStatusOK(const BlockIdPB& 
src_block_id,
+                                                              int num_blocks,
+                                                              atomic<int32_t>* 
block_count,
+                                                              BlockIdPB* 
dest_block_id,
+                                                              Status* 
end_status) {
+  {
+    std::lock_guard<simple_spinlock> l(simple_lock_);
+    RETURN_NOT_OK(*end_status);
+  }
+  Status s = DownloadAndRewriteBlock(src_block_id, num_blocks, block_count, 
dest_block_id);
+  if (!s.ok()) {
+    std::lock_guard<simple_spinlock> l(simple_lock_);
+    if (!s.ok() && end_status->ok()) {
+      *end_status = s;
+    }
+  }
+  return s;
+}
+
 Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
                                        BlockId* new_block_id) {
   VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << 
old_block_id.ToString();
   RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(), "Not downloading block for 
replica");
 
   unique_ptr<WritableBlock> block;
+  // log_block_manager uses a lock to guarantee the block_id is unique.
   RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(CreateBlockOptions({ 
tablet_id_ }), &block),
                         "Unable to create new block");
 
@@ -721,7 +793,10 @@ Status TabletCopyClient::DownloadBlock(const BlockId& 
old_block_id,
 
   *new_block_id = block->id();
   RETURN_NOT_OK_PREPEND(block->Finalize(), "Unable to finalize block");
-  transaction_->AddCreatedBlock(std::move(block));
+  {
+    std::lock_guard<simple_spinlock> l(simple_lock_);
+    transaction_->AddCreatedBlock(std::move(block));
+  }
   return Status::OK();
 }
 
diff --git a/src/kudu/tserver/tablet_copy_client.h 
b/src/kudu/tserver/tablet_copy_client.h
index 27ed996..52d91ba 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <memory>
 #include <string>
@@ -25,6 +26,7 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
@@ -35,6 +37,7 @@ class BlockId;
 class BlockIdPB;
 class FsManager;
 class HostPort;
+class ThreadPool;
 
 namespace consensus {
 class ConsensusMetadata;
@@ -52,14 +55,15 @@ class RpcController;
 } // namespace rpc
 
 namespace tablet {
+class RowSetDataPB;
 class TabletMetadata;
 class TabletReplica;
 class TabletSuperBlockPB;
 } // namespace tablet
 
 namespace tserver {
-class DataIdPB;
 class DataChunkPB;
+class DataIdPB;
 class TabletCopyServiceProxy;
 
 // Server-wide tablet copy metrics.
@@ -193,6 +197,15 @@ class TabletCopyClient {
   // Count the number of blocks on the remote (from 'remote_superblock_').
   int CountRemoteBlocks() const;
 
+  // A task do download blocks of the specified rowset.
+  // In case of a failure, a thread notifies others via 'end_status',
+  // so they can abort downloading of their rowsets' blocks.
+  // If all threads succeed, 'end_status' is set to Status::OK().
+  void DownloadRowset(const tablet::RowSetDataPB& src_rowset,
+                      int num_remote_blocks,
+                      std::atomic<int>* block_count,
+                      Status* end_status);
+
   // Download all blocks belonging to a tablet sequentially. Add all
   // downloaded blocks to the tablet copy's transaction.
   //
@@ -210,9 +223,20 @@ class TabletCopyClient {
   // - 'block_count' is incremented by 1.
   Status DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
                                  int num_blocks,
-                                 int* block_count,
+                                 std::atomic<int32_t>* block_count,
                                  BlockIdPB* dest_block_id);
 
+  // Download and rewrite remote block if end_status it hasn't been set as 
failed.
+  // This method calls DownloadAndRewriteBlock and return its' status.
+  // This method is thread-safe.
+  //
+  // On failure, end_status is set as error status of DownloadAndRewriteBlock.
+  Status DownloadAndRewriteBlockIfEndStatusOK(const BlockIdPB& src_block_id,
+                                              int num_blocks,
+                                              std::atomic<int32_t>* 
block_count,
+                                              BlockIdPB* dest_block_id,
+                                              Status* end_status);
+
   // Download a single block.
   // Data block is opened with new ID. After downloading, the block is 
finalized
   // and added to the tablet copy's transaction.
@@ -278,6 +302,13 @@ class TabletCopyClient {
   // Block transaction for the tablet copy.
   std::unique_ptr<fs::BlockCreationTransaction> transaction_;
 
+  // Thread pool for downloading all data blocks in parallel.
+  std::unique_ptr<ThreadPool> blocks_download_pool_;
+
+  // Protects adding/creating blocks, adding a rowset,
+  // reading/updating rowset download status.
+  simple_spinlock simple_lock_;
+
   DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
 };
 

Reply via email to