This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 277586441c849ac639d24d78b98c0c3bfcce06c8
Author: xinghuayu007 <[email protected]>
AuthorDate: Wed Mar 15 11:25:02 2023 +0800

    KUDU-3459 Download superblock piece by piece
    
    Currently, Kudu downloads superblock once a time not piece
    by piece. When superblock is very large, it may over the
    size of the rpc maximum message size defined as:
    --rpc_max_message_size. Then downloading superblock will
    fail.
    
    This patch provides another method to download superblock,
    download superblock piece by piece. A new flag:
    --tablet_copy_support_download_superblock_in_batch is added. The
    original logic keep unchange. When find that superblock is very
    large, it will switch into downloading superblock piece by piece
    automatically, if the flag is set true.
    
    Change-Id: I3a7c212784383e247566a4701965e2897de83303
    Reviewed-on: http://gerrit.cloudera.org:8080/19620
    Reviewed-by: Yingchun Lai <[email protected]>
    Tested-by: Yingchun Lai <[email protected]>
---
 src/kudu/tools/kudu-tool-test.cc               | 145 +++++++++++++++++++++++++
 src/kudu/tools/tool_action_local_replica.cc    |   2 +
 src/kudu/tserver/tablet_copy.proto             |   9 +-
 src/kudu/tserver/tablet_copy_client.cc         |  85 +++++++++++++--
 src/kudu/tserver/tablet_copy_client.h          |   4 +
 src/kudu/tserver/tablet_copy_service.cc        |  21 +++-
 src/kudu/tserver/tablet_copy_source_session.cc |  36 ++++++
 src/kudu/tserver/tablet_copy_source_session.h  |  11 +-
 8 files changed, 302 insertions(+), 11 deletions(-)

diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 03a3541f7..cfd331a84 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <limits.h>
 #include <sys/stat.h>
 
 #include <algorithm>
@@ -103,6 +104,7 @@
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/transfer.h"
 #include "kudu/subprocess/subprocess_protocol.h"
 #include "kudu/tablet/local_tablet_writer.h"
 #include "kudu/tablet/metadata.pb.h"
@@ -125,6 +127,7 @@
 #include "kudu/util/async_util.h"
 #include "kudu/util/env.h"
 #include "kudu/util/jsonreader.h"
+#include "kudu/util/logging_test_util.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -152,6 +155,7 @@ DECLARE_bool(fs_data_dirs_consider_available_space);
 DECLARE_bool(hive_metastore_sasl_enabled);
 DECLARE_bool(show_values);
 DECLARE_bool(show_attributes);
+DECLARE_bool(tablet_copy_support_download_superblock_in_batch);
 DECLARE_int32(catalog_manager_inject_latency_load_ca_info_ms);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
@@ -9347,6 +9351,147 @@ TEST_F(ToolTest, TestLocalReplicaCopyRemote) {
   }
 }
 
+
+class DownloadSuperblockInBatchTest :
+    public ToolTest,
+    public ::testing::WithParamInterface<std::tuple<bool, bool>> {
+};
+
+INSTANTIATE_TEST_SUITE_P(DownloadSuperblockInBatchParameterized, 
DownloadSuperblockInBatchTest,
+                         ::testing::Combine(::testing::Bool(),
+                                            ::testing::Bool()));
+
+TEST_P(DownloadSuperblockInBatchTest, TestDownloadSuperblockInBatch) {
+  constexpr const char* const kTableName = "test_table";
+  // Generating a large superblock will cost a lot of time.
+  // Set rpc_max_message_size very small will cause rpc request failed.
+  // Therefore, here set superblock a suitable size.
+  int kSuperblockSize = 20000;
+  FLAGS_encrypt_data_at_rest = std::get<0>(GetParam());
+  FLAGS_tablet_copy_support_download_superblock_in_batch = 
std::get<1>(GetParam());
+  InternalMiniClusterOptions opts;
+  opts.num_tablet_servers = 2;
+  FLAGS_allow_unsafe_replication_factor = true;
+  NO_FATALS(StartMiniCluster(std::move(opts)));
+  // Get a kudu client.
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(mini_cluster_->CreateClient(nullptr, &client));
+
+  // Generate a schema.
+  KuduSchemaBuilder schema_builder;
+  schema_builder.AddColumn("key")
+      ->Type(client::KuduColumnSchema::INT32)
+      ->NotNull()
+      ->PrimaryKey();
+  KuduSchema schema;
+  ASSERT_OK(schema_builder.Build(&schema));
+  unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+  ASSERT_OK(lower_bound->SetInt32("key", 0));
+  unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+  ASSERT_OK(upper_bound->SetInt32("key", 1));
+  unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+  // Create a table with a single tablet.
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&schema)
+            .set_range_partition_columns({ "key" })
+            .add_range_partition(lower_bound.release(), upper_bound.release())
+            .num_replicas(1)
+            .Create());
+
+  vector<string> tablet_ids_first = 
mini_cluster_->mini_tablet_server(0)->ListTablets();
+  vector<string> tablet_ids_second = 
mini_cluster_->mini_tablet_server(1)->ListTablets();
+  MiniTabletServer* src_tserver;
+  MiniTabletServer* dst_tserver;
+  string tablet_id_to_copy;
+  // Decide which one is the source tserver, and which one is destination 
tserver.
+  // There is only one tablet in this cluster, so we known which one is source 
tserver
+  // by comparing the size of tablets.
+  // It will copy the single tablet from the source tserver to the destination 
tserver.
+  if (tablet_ids_first.size() > tablet_ids_second.size()) {
+    tablet_id_to_copy = tablet_ids_first.at(0);
+    src_tserver = mini_cluster_->mini_tablet_server(0);
+    dst_tserver = mini_cluster_->mini_tablet_server(1);
+  } else {
+    tablet_id_to_copy = tablet_ids_second.at(0);
+    src_tserver = mini_cluster_->mini_tablet_server(1);
+    dst_tserver = mini_cluster_->mini_tablet_server(0);
+  }
+
+  // Add new columns for the table until superblock file is larger than 
kSuperblockSize.
+  int i = 0;
+  while (true) {
+    string column_name = Substitute("col_$0", i);
+    unique_ptr<KuduTableAlterer> 
table_alterer(client->NewTableAlterer(kTableName));
+    
table_alterer->AddColumn(column_name)->Type(client::KuduColumnSchema::INT32)
+                 ->NotNull()->Default(KuduValue::FromInt(INT_MAX))
+                 
->Comment("testtttttttttttttttttttttttttttttttttttttttttttttt");
+    ASSERT_OK(table_alterer->Alter());
+    uint64_t file_size = 0;
+    string superblock_path =
+        
src_tserver->server()->fs_manager()->GetTabletMetadataPath(tablet_id_to_copy);
+    ASSERT_OK(env_->GetFileSize(superblock_path, &file_size));
+    if (file_size > kSuperblockSize) {
+      break;
+    }
+    i++;
+  }
+  // Restart the source tserver to reset rpc_max_message_size a small size.
+  // So it is easy to make the size of superblock over the value of 
rpc_max_message_size.
+  FLAGS_rpc_max_message_size = kSuperblockSize / 4;
+  ASSERT_OK(src_tserver->Restart());
+
+  string source_tserver_rpc_addr = src_tserver->bound_rpc_addr().ToString();
+  string wal_dir = dst_tserver->options()->fs_opts.wal_root;
+  string data_dirs = JoinStrings(dst_tserver->options()->fs_opts.data_roots, 
",");
+  NO_FATALS(dst_tserver->Shutdown());
+
+  // Copy tablet replicas from source tserver to destination tserver.
+  StringVectorSink capture_logs;
+  ScopedRegisterSink reg(&capture_logs);
+  string stderr;
+  RunActionStdoutStderrString(
+      Substitute("local_replica copy_from_remote $0 $1 "
+                 "-fs_data_dirs=$2 -fs_wal_dir=$3 "
+                 "--tablet_copy_support_download_superblock_in_batch=$4 "
+                 // Disable --rpc_max_message_size_enable_validation, so
+                 // --rpc_max_message_size can be set a small value.
+                 "--rpc_max_message_size_enable_validation=false "
+                 // Set --rpc_max_message_size very small, so it is easy for 
the size of
+                 // superblock over --rpc_max_message_size. It is used to 
repeat the network
+                 // error, see line 9477.
+                 "--rpc_max_message_size=$5 "
+                 // This flag and --rpc_max_message_size are in a group flag 
validator, so
+                 // it is also should be set a small value.
+                 "--consensus_max_batch_size_bytes=$6 "
+                 "--encrypt_data_at_rest=$7 "
+                 "--tablet_copy_transfer_chunk_size_bytes=50",
+                 tablet_id_to_copy,
+                 source_tserver_rpc_addr,
+                 data_dirs,
+                 wal_dir,
+                 FLAGS_tablet_copy_support_download_superblock_in_batch,
+                 (kSuperblockSize / 4),
+                 (kSuperblockSize / 8),
+                 FLAGS_encrypt_data_at_rest), nullptr, &stderr);
+  // The size of superblock is larger than rpc_max_message_size, it will cause 
a network error.
+  // Downloading superblock will fail.
+  if (!FLAGS_tablet_copy_support_download_superblock_in_batch) {
+    ASSERT_STR_CONTAINS(stderr, "recv error: Network error: RPC frame had a 
length");
+    return;
+  } else { // Download superblock piece by piece.
+    ASSERT_STR_CONTAINS(JoinStrings(capture_logs.logged_msgs(), "\n"),
+                        "Superblock is very large, it maybe over the rpc 
messsage size, "
+                        "which is controlled by flag --rpc_max_message_size. 
Switch it into "
+                        "downloading superblock piece by piece");
+  }
+
+  NO_FATALS(dst_tserver->Start());
+  const vector<string>& target_tablet_ids = dst_tserver->ListTablets();
+  // Copied tablet will exist in target tablet server.
+  ASSERT_TRUE(find(target_tablet_ids.begin(), target_tablet_ids.end(), 
tablet_id_to_copy)
+              != target_tablet_ids.end());
+}
+
 TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
   SKIP_IF_SLOW_NOT_ALLOWED();
   // Local copies are not supported on encrypted severs at this time.
diff --git a/src/kudu/tools/tool_action_local_replica.cc 
b/src/kudu/tools/tool_action_local_replica.cc
index 821796cff..b3c85bb03 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -157,6 +157,7 @@ DEFINE_string(dst_fs_metadata_dir, "",
               "will be used as the metadata directory.");;
 
 DECLARE_int32(num_threads);
+DECLARE_bool(tablet_copy_support_download_superblock_in_batch);
 DECLARE_int32(tablet_copy_download_threads_nums_per_session);
 DECLARE_string(tables);
 
@@ -1368,6 +1369,7 @@ unique_ptr<Mode> BuildLocalReplicaMode() {
       .AddOptionalParameter("fs_wal_dir")
       .AddOptionalParameter("tablet_copy_download_threads_nums_per_session")
       .AddOptionalParameter("num_threads")
+      .AddOptionalParameter("tablet_copy_support_download_superblock_in_batch")
       .Build();
 
   unique_ptr<Action> copy_from_local =
diff --git a/src/kudu/tserver/tablet_copy.proto 
b/src/kudu/tserver/tablet_copy.proto
index 6260bd5f8..f216249df 100644
--- a/src/kudu/tserver/tablet_copy.proto
+++ b/src/kudu/tserver/tablet_copy.proto
@@ -95,6 +95,9 @@ message BeginTabletCopySessionRequestPB {
 
   // tablet_id of the tablet the requester desires to bootstrap from.
   required bytes tablet_id = 2;
+
+  // Whether to download superblock in batch.
+  optional bool auto_download_superblock_in_batch = 3 [default = false];
 }
 
 message BeginTabletCopySessionResponsePB {
@@ -108,7 +111,7 @@ message BeginTabletCopySessionResponsePB {
   required uint64 session_idle_timeout_millis = 2;
 
   // Active superblock at the time of the request.
-  required tablet.TabletSuperBlockPB superblock = 3;
+  optional tablet.TabletSuperBlockPB superblock = 3;
 
   // Identifiers for the WAL segments available for download.
   // Each WAL segment is keyed by its sequence number.
@@ -120,6 +123,9 @@ message BeginTabletCopySessionResponsePB {
 
   // permanent_uuid of the responding peer.
   optional bytes responder_uuid = 6;
+
+  // To represent that superblock is too large to download directly.
+  optional bool superblock_is_too_large = 7 [default = false];
 }
 
 message CheckTabletCopySessionActiveRequestPB {
@@ -142,6 +148,7 @@ message DataIdPB {
     UNKNOWN = 0;
     BLOCK = 1;
     LOG_SEGMENT = 2;
+    SUPER_BLOCK = 3;
   }
 
   // Indicator whether it's a block or log segment id.
diff --git a/src/kudu/tserver/tablet_copy_client.cc 
b/src/kudu/tserver/tablet_copy_client.cc
index 4be399d79..47846bc59 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -30,7 +30,6 @@
 #include <glog/logging.h>
 #include <google/protobuf/stubs/port.h>
 
-#include "kudu/common/common.pb.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
@@ -74,6 +73,7 @@
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
@@ -143,6 +143,10 @@ 
DEFINE_int32(tablet_copy_download_threads_nums_per_session, 4,
 DEFINE_validator(tablet_copy_download_threads_nums_per_session,
      [](const char* /*n*/, int32_t v) { return v > 0; });
 
+DEFINE_bool(tablet_copy_support_download_superblock_in_batch, true,
+            "Whether to support download superblock in batch automatically 
when it is very large."
+            "When superblock is small, it can be downloaded once a time.");
+
 DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
 
 METRIC_DEFINE_counter(server, tablet_copy_bytes_fetched,
@@ -298,6 +302,8 @@ Status RemoteTabletCopyClient::Start(const HostPort& 
copy_source_addr,
   BeginTabletCopySessionRequestPB req;
   req.set_requestor_uuid(dst_fs_manager_->uuid());
   req.set_tablet_id(tablet_id_);
+  req.set_auto_download_superblock_in_batch(
+      FLAGS_tablet_copy_support_download_superblock_in_batch);
 
   rpc::RpcController controller;
 
@@ -306,12 +312,36 @@ Status RemoteTabletCopyClient::Start(const HostPort& 
copy_source_addr,
   RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
     return proxy_->BeginTabletCopySession(req, &resp, &controller);
   }), "unable to begin tablet copy session");
-
+  session_id_ = resp.session_id();
+  if (resp.superblock_is_too_large()) {
+    // Download superblock from remote and store it in local file.
+    string superblock_path;
+    // Delete the temporary superblock file finally.
+    auto deleter = MakeScopedCleanup([&]() {
+      if (!superblock_path.empty() &&
+          dst_fs_manager_->env()->FileExists(superblock_path)) {
+        WARN_NOT_OK(dst_fs_manager_->env()->DeleteFile(superblock_path),
+                    Substitute("Could not delete temporary superblock file $0",
+                               superblock_path));
+      }
+    });
+    RETURN_NOT_OK_PREPEND(DownloadSuperBlock(&superblock_path),
+                          "Download super block failed");
+    remote_superblock_.reset(new tablet::TabletSuperBlockPB);
+    // Load superblock from a local temporary file.
+    RETURN_NOT_OK_PREPEND(
+        pb_util::ReadPBContainerFromPath(dst_fs_manager_->env(), 
superblock_path,
+                                         remote_superblock_.get(), 
pb_util::SENSITIVE),
+        Substitute("Could not load superblock from $0", superblock_path));
+  } else {
+    CHECK(resp.has_superblock());
+    remote_superblock_.reset(resp.release_superblock());
+  }
   TRACE("Tablet copy session begun");
 
   string copy_peer_uuid = resp.has_responder_uuid()
       ? resp.responder_uuid() : "(unknown uuid)";
-  TabletDataState data_state = resp.superblock().tablet_data_state();
+  TabletDataState data_state = remote_superblock_->tablet_data_state();
   if (data_state != tablet::TABLET_DATA_READY) {
     Status s = Status::IllegalState(
         Substitute("Remote peer ($0) is not ready itself! state: $1",
@@ -320,13 +350,9 @@ Status RemoteTabletCopyClient::Start(const HostPort& 
copy_source_addr,
     return s;
   }
 
-  session_id_ = resp.session_id();
   // Update our default RPC timeout to reflect the server's session timeout.
   session_idle_timeout_millis_ = resp.session_idle_timeout_millis();
 
-  // Store a copy of the remote (old) superblock.
-  remote_superblock_.reset(resp.release_superblock());
-
   // Make a copy of the remote superblock. We first clear out the remote blocks
   // from this structure and then add them back in as they are downloaded.
   superblock_.reset(new TabletSuperBlockPB(*remote_superblock_));
@@ -885,6 +911,51 @@ Status TabletCopyClient::DownloadBlock(const BlockId& 
old_block_id,
   return Status::OK();
 }
 
+Status RemoteTabletCopyClient::DownloadSuperBlock(string* superblock_path) {
+  DataIdPB data_id;
+  data_id.set_type(DataIdPB::SUPER_BLOCK);
+
+  rpc::RpcController controller;
+  
controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
+  FetchDataRequestPB req;
+  req.set_session_id(session_id_);
+  req.mutable_data_id()->CopyFrom(data_id);
+  req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
+
+  // Create a temporary file to store the superblock.
+  string tmpl = "super_block.tmp.XXXXXX";
+  unique_ptr<RWFile> tmp_file;
+  RETURN_NOT_OK_PREPEND(dst_fs_manager_->env()->NewTempRWFile(
+      RWFileOptions(), tmpl, superblock_path, &tmp_file),
+                        "could not create temporary super block data file");
+
+  bool done = false;
+  uint64_t offset = 0;
+  while (!done) {
+    req.set_offset(offset);
+
+    // Request the next data chunk.
+    FetchDataResponsePB resp;
+    RETURN_NOT_OK_PREPEND(SendRpcWithRetry(&controller, [&] {
+        return proxy_->FetchData(req, &resp, &controller);
+    }), "unable to fetch data from remote");
+
+    // Sanity-check for corruption.
+    RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
+                          Substitute("error validating data item $0",
+                                     
pb_util::SecureShortDebugString(data_id)));
+
+    // Write the data.
+    RETURN_NOT_OK(tmp_file->Write(offset, resp.chunk().data()));
+
+    auto chunk_size = resp.chunk().data().size();
+    done = offset + chunk_size == resp.chunk().total_data_length();
+    offset += chunk_size;
+  }
+  RETURN_NOT_OK_PREPEND(tmp_file->Close(), "close superblock data file 
failed");
+  return Status::OK();
+}
+
 template<class Appendable>
 Status RemoteTabletCopyClient::DownloadFile(const DataIdPB& data_id,
                                             Appendable* appendable) {
diff --git a/src/kudu/tserver/tablet_copy_client.h 
b/src/kudu/tserver/tablet_copy_client.h
index 127334cba..5239477f9 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -344,6 +344,10 @@ class RemoteTabletCopyClient : public TabletCopyClient {
   // need to be in the header.
   template<class Appendable>
   Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
+
+  // Download the superblock from remote. The superblock will firstly be stored
+  // in a temporary local file 'superblock_path' and then loaded into the 
memory.
+  Status DownloadSuperBlock(std::string* superblock_path);
 };
 
 class LocalTabletCopyClient : public TabletCopyClient {
diff --git a/src/kudu/tserver/tablet_copy_service.cc 
b/src/kudu/tserver/tablet_copy_service.cc
index 540569bd2..473ee88d1 100644
--- a/src/kudu/tserver/tablet_copy_service.cc
+++ b/src/kudu/tserver/tablet_copy_service.cc
@@ -37,6 +37,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/transfer.h"
 #include "kudu/server/server_base.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -204,13 +205,21 @@ void TabletCopyServiceImpl::BeginTabletCopySession(
   resp->set_responder_uuid(fs_manager_->uuid());
   resp->set_session_id(session_id);
   resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_sec * 
1000);
-  resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
   resp->mutable_initial_cstate()->CopyFrom(session->initial_cstate());
 
   for (const scoped_refptr<log::ReadableLogSegment>& segment : 
session->log_segments()) {
     resp->add_wal_segment_seqnos(segment->header().sequence_number());
   }
-
+  if (req->auto_download_superblock_in_batch() &&
+      (resp->ByteSizeLong() + session->tablet_superblock().ByteSizeLong() >
+      FLAGS_rpc_max_message_size)) {
+    LOG(WARNING) << "Superblock is very large, it maybe over the rpc messsage 
size, "
+                    "which is controlled by flag --rpc_max_message_size. 
Switch it into "
+                    "downloading superblock piece by piece.";
+    resp->set_superblock_is_too_large(true);
+  } else {
+    resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
+  }
   // For testing: Close the session prematurely if unsafe gflag is set but
   // still respond as if it was opened.
   const auto timeout_prob = FLAGS_tablet_copy_early_session_timeout_prob;
@@ -299,6 +308,10 @@ void TabletCopyServiceImpl::FetchData(const 
FetchDataRequestPB* req,
     RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen,
                                              data, &total_data_length, 
&error_code),
                       error_code, "Unable to get piece of data block", 
context);
+  } else if (data_id.type() == DataIdPB::SUPER_BLOCK) {
+    RPC_RETURN_NOT_OK(session->GetSuperBlockPiece(offset, client_maxlen, data,
+                                                  &total_data_length, 
&error_code),
+                      error_code, "Unable to get piece of super block", 
context);
   } else {
     // Fetching a log segment chunk.
     uint64_t segment_seqno = data_id.wal_segment_seqno();
@@ -371,6 +384,10 @@ Status TabletCopyServiceImpl::FindSessionUnlocked(
 Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
         const DataIdPB& data_id,
         TabletCopyErrorPB::Code* app_error) {
+  if (data_id.type() == DataIdPB::SUPER_BLOCK) {
+    return Status::OK();
+  }
+
   if (PREDICT_FALSE(data_id.has_block_id() && 
data_id.has_wal_segment_seqno())) {
     *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
     return Status::InvalidArgument(
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc 
b/src/kudu/tserver/tablet_copy_source_session.cc
index cff1ee92f..aaa227d87 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -48,6 +48,7 @@
 #include "kudu/rpc/transfer.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
@@ -313,6 +314,34 @@ Status TabletCopySourceSession::GetBlockPiece(const 
BlockId& block_id,
   return Status::OK();
 }
 
+Status TabletCopySourceSession::GetSuperBlockPiece(uint64_t offset,
+                                                   int64_t client_maxlen,
+                                                   string* data,
+                                                   int64_t* 
superblock_file_size,
+                                                   TabletCopyErrorPB::Code* 
error_code) {
+  if (offset == 0) {
+    RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath(
+                          fs_manager_->env(), superblock_tmp_path_, 
tablet_superblock_,
+                          pb_util::NO_OVERWRITE, pb_util::SYNC,
+                          pb_util::SENSITIVE),
+                          Substitute("Failed to write tablet metadata $0", 
tablet_id_));
+  }
+  unique_ptr<RandomAccessFile> super_block_reader;
+  RETURN_NOT_OK(fs_manager_->env()->NewRandomAccessFile(superblock_tmp_path_,
+                                                        &super_block_reader));
+  uint64_t file_size;
+  RETURN_NOT_OK(super_block_reader->Size(&file_size));
+  *superblock_file_size = file_size;
+  int64_t response_data_size = 0;
+  RETURN_NOT_OK(GetResponseDataSize(file_size, offset, client_maxlen,
+                                    error_code, &response_data_size));
+  data->resize(response_data_size);
+  uint8_t* buf = reinterpret_cast<uint8_t*>(const_cast<char*>(data->data()));
+  Slice slice(buf, response_data_size);
+  RETURN_NOT_OK_PREPEND(super_block_reader->Read(offset, slice), "read file 
failed");
+  return Status::OK();
+}
+
 Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
                                                    uint64_t offset, int64_t 
client_maxlen,
                                                    string* data, int64_t* 
log_file_size,
@@ -457,9 +486,16 @@ 
RemoteTabletCopySourceSession::RemoteTabletCopySourceSession(
       tablet_replica_(std::move(tablet_replica)),
       session_id_(std::move(session_id)),
       requestor_uuid_(std::move(requestor_uuid)) {
+  // Use session id to generate the superblock temporary path.
+  // It is thread safety, beacause session id is unique.
+  superblock_tmp_path_ = Substitute("/tmp/tablet-meta-$0", session_id_);
 }
 
 RemoteTabletCopySourceSession::~RemoteTabletCopySourceSession() {
+  if (!superblock_tmp_path_.empty() &&
+      fs_manager_->env()->FileExists(superblock_tmp_path_)) {
+    CHECK_OK(fs_manager_->env()->DeleteFile(superblock_tmp_path_));
+  }
   // No lock taken in the destructor, should only be 1 thread with access now.
   CHECK_OK(UnregisterAnchorIfNeededUnlocked());
 }
diff --git a/src/kudu/tserver/tablet_copy_source_session.h 
b/src/kudu/tserver/tablet_copy_source_session.h
index 0e8439168..e7c0cb66b 100644
--- a/src/kudu/tserver/tablet_copy_source_session.h
+++ b/src/kudu/tserver/tablet_copy_source_session.h
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 
@@ -35,7 +36,6 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tserver/tablet_copy.pb.h"
-#include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/once.h"
 #include "kudu/util/slice.h"
@@ -44,6 +44,7 @@
 namespace kudu {
 
 class FsManager;
+class RWFile;
 
 namespace tablet {
 class TabletReplica;
@@ -129,6 +130,13 @@ class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSess
                        std::string* data, int64_t* block_file_size,
                        TabletCopyErrorPB::Code* error_code);
 
+  // Open superblock file and get data piece according to the offset.
+  Status GetSuperBlockPiece(uint64_t offset,
+                            int64_t client_maxlen,
+                            std::string* data,
+                            int64_t* superblock_file_size,
+                            TabletCopyErrorPB::Code* error_code);
+
   // Get a piece of a log segment.
   // The behavior and params are very similar to GetBlockPiece(), but this one
   // is only for sending WAL segment files.
@@ -195,6 +203,7 @@ class TabletCopySourceSession : public 
RefCountedThreadSafe<TabletCopySourceSess
                         TabletCopyErrorPB::Code* error_code);
 
   const std::string tablet_id_;
+  std::string superblock_tmp_path_;
   FsManager* const fs_manager_;
 
   // Protects concurrent access to Init().

Reply via email to