This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 28f8e97 [tools] Add tool to copy table data to another table
28f8e97 is described below
commit 28f8e972fcd042a26a502cbb1f1102c487c9398d
Author: Yingchun Lai <[email protected]>
AuthorDate: Sat Feb 23 11:54:25 2019 +0800
[tools] Add tool to copy table data to another table
Copy table data to another table; the two tables could be in the same
cluster or not. The two tables must have the same table schema, but
could have different partition schemas. Alternatively, the tool can
create the new table using the same table and partition schema as the
source table.
It's useful to use this tool to copy small tables, or just copy table
schemas in a convenient way, and it's recommend to use Java client's
Backup/Restore features to copy large tables.
Change-Id: Ifdec51701ac9ec57739b1a6f7c18786294642a54
Reviewed-on: http://gerrit.cloudera.org:8080/12563
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/client/scan_batch.h | 2 +
src/kudu/client/schema.cc | 2 +-
src/kudu/common/partial_row.h | 5 +
src/kudu/common/partition.h | 39 +--
src/kudu/common/schema.cc | 2 +-
src/kudu/common/schema.h | 2 -
src/kudu/integration-tests/test_workload.cc | 10 +-
src/kudu/tools/data_gen_util.cc | 9 +
src/kudu/tools/kudu-admin-test.cc | 1 -
src/kudu/tools/kudu-tool-test.cc | 396 ++++++++++++++++++++++++++++
src/kudu/tools/table_scanner.cc | 301 ++++++++++++++++++---
src/kudu/tools/table_scanner.h | 40 ++-
src/kudu/tools/tool_action_common.cc | 12 +-
src/kudu/tools/tool_action_common.h | 6 +
src/kudu/tools/tool_action_perf.cc | 4 +-
src/kudu/tools/tool_action_table.cc | 51 +++-
16 files changed, 813 insertions(+), 69 deletions(-)
diff --git a/src/kudu/client/scan_batch.h b/src/kudu/client/scan_batch.h
index f381ae2..e975e91 100644
--- a/src/kudu/client/scan_batch.h
+++ b/src/kudu/client/scan_batch.h
@@ -42,6 +42,7 @@ class Schema;
namespace tools {
class ReplicaDumper;
+class TableScanner;
} // namespace tools
namespace client {
@@ -294,6 +295,7 @@ class KUDU_EXPORT KuduScanBatch::RowPtr {
private:
friend class KuduScanBatch;
+ friend class tools::TableScanner;
template<typename KeyTypeWrapper> friend struct SliceKeysTestSetup;
template<typename KeyTypeWrapper> friend struct IntKeysTestSetup;
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index c75a293..3ce5a1b 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -759,7 +759,7 @@ string KuduSchema::ToString() const {
}
KuduSchema KuduSchema::FromSchema(const Schema& schema) {
- return KuduSchema(schema);
+ return KuduSchema(schema.CopyWithoutColumnIds());
}
Schema KuduSchema::ToSchema(const KuduSchema& kudu_schema) {
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index 11698f4..d43d109 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -52,6 +52,10 @@ namespace tablet {
template<typename KeyTypeWrapper> struct NumTypeRowOps; // IWYU pragma:
keep
} // namespace tablet
+namespace tools {
+class TableScanner;
+} // namespace tools
+
/// @endcond
class Schema;
@@ -500,6 +504,7 @@ class KUDU_EXPORT KuduPartialRow {
friend class PartitionSchema;
friend class RowOperationsPBDecoder;
friend class RowOperationsPBEncoder;
+ friend class tools::TableScanner;
friend class TestScanSpec;
template<typename KeyTypeWrapper> friend struct client::SliceKeysTestSetup;
template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index 88f4bdb..47baf93 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -136,6 +136,15 @@ class Partition {
// the methods which format individual partition keys do redact.
class PartitionSchema {
public:
+ struct RangeSchema {
+ std::vector<ColumnId> column_ids;
+ };
+
+ struct HashBucketSchema {
+ std::vector<ColumnId> column_ids;
+ int32_t num_buckets;
+ uint32_t seed;
+ };
// Deserializes a protobuf message into a partition schema.
static Status FromPB(const PartitionSchemaPB& pb,
@@ -234,21 +243,25 @@ class PartitionSchema {
// contain unredacted row data.
Status MakeUpperBoundRangePartitionKeyExclusive(KuduPartialRow* row) const;
+ // Decodes a range partition key into a partial row, with variable-length
+ // fields stored in the arena.
+ Status DecodeRangeKey(Slice* encode_key,
+ KuduPartialRow* partial_row,
+ Arena* arena) const;
+
+ const RangeSchema& range_partition_schema() const {
+ return range_schema_;
+ }
+
+ const std::vector<HashBucketSchema>& hash_partition_schemas() const {
+ return hash_bucket_schemas_;
+ }
+
private:
friend class PartitionPruner;
FRIEND_TEST(PartitionTest, TestIncrementRangePartitionBounds);
FRIEND_TEST(PartitionTest, TestIncrementRangePartitionStringBounds);
- struct RangeSchema {
- std::vector<ColumnId> column_ids;
- };
-
- struct HashBucketSchema {
- std::vector<ColumnId> column_ids;
- int32_t num_buckets;
- uint32_t seed;
- };
-
// Returns a text description of the encoded range key suitable for debug
printing.
std::string RangeKeyDebugString(Slice range_key, const Schema& schema) const;
std::string RangeKeyDebugString(const KuduPartialRow& key) const;
@@ -318,12 +331,6 @@ class PartitionSchema {
// This method is useful used for encoding splits and bounds.
Status EncodeRangeKey(const KuduPartialRow& row, const Schema& schema,
std::string* key) const;
- // Decodes a range partition key into a partial row, with variable-length
- // fields stored in the arena.
- Status DecodeRangeKey(Slice* encode_key,
- KuduPartialRow* partial_row,
- Arena* arena) const;
-
// Decodes the hash bucket component of a partition key into its buckets.
//
// This should only be called with partition keys created from a row, not
with
diff --git a/src/kudu/common/schema.cc b/src/kudu/common/schema.cc
index c5ca4f4..3cb3c98 100644
--- a/src/kudu/common/schema.cc
+++ b/src/kudu/common/schema.cc
@@ -345,7 +345,6 @@ Schema Schema::CopyWithColumnIds() const {
}
Schema Schema::CopyWithoutColumnIds() const {
- CHECK(has_column_ids());
return Schema(cols_, num_key_columns_);
}
@@ -430,6 +429,7 @@ string Schema::ToString(ToStringMode mode) const {
if (cols_.empty()) return "()";
vector<string> pk_strs;
+ pk_strs.reserve(num_key_columns_);
for (int i = 0; i < num_key_columns_; i++) {
pk_strs.push_back(cols_[i].name());
}
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index 9886a61..faf83f1 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -696,8 +696,6 @@ class Schema {
// Return a new Schema which is the same as this one, but without any column
// IDs assigned.
- //
- // Requires that this schema has column IDs.
Schema CopyWithoutColumnIds() const;
// Create a new schema containing only the selected columns.
diff --git a/src/kudu/integration-tests/test_workload.cc
b/src/kudu/integration-tests/test_workload.cc
index 4fd05e8..7b9863e 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -93,12 +93,11 @@ void TestWorkload::set_schema(const client::KuduSchema&
schema) {
CHECK_GT(schema.num_columns(), 0) << "Schema should have at least one
column";
std::vector<int> key_indexes;
schema.GetPrimaryKeyColumnIndexes(&key_indexes);
- CHECK_EQ(1, key_indexes.size()) << "Schema should have just one key column";
- CHECK_EQ(0, key_indexes[0]) << "Schema's key column should be index 0";
+ CHECK_LE(1, key_indexes.size()) << "Schema should have at least one key
column";
+ CHECK_EQ(0, key_indexes[0]) << "Schema's first key column should be index 0";
KuduColumnSchema key = schema.Column(0);
- CHECK_EQ("key", key.name()) << "Schema column should be named 'key'";
CHECK_EQ(KuduColumnSchema::INT32, key.type())
- << "Schema key column should be of type INT32";
+ << "Schema's first key column should be of type INT32";
schema_ = schema;
}
@@ -341,6 +340,9 @@ void TestWorkload::Start() {
Status TestWorkload::Cleanup() {
// Should be run only when workload is inactive.
CHECK(!should_run_.Load() && threads_.empty());
+ if (!client_) {
+ CHECK_OK(cluster_->CreateClient(&client_builder_, &client_));
+ }
return client_->DeleteTable(table_name_);
}
diff --git a/src/kudu/tools/data_gen_util.cc b/src/kudu/tools/data_gen_util.cc
index 4c10fea..81c7d82 100644
--- a/src/kudu/tools/data_gen_util.cc
+++ b/src/kudu/tools/data_gen_util.cc
@@ -61,6 +61,15 @@ void WriteValueToColumn(const client::KuduSchema& schema,
case client::KuduColumnSchema::BOOL:
CHECK_OK(row->SetBool(col_idx, value));
break;
+ case client::KuduColumnSchema::BINARY:
+ CHECK_OK(row->SetBinaryCopy(col_idx, FastHex64ToBuffer(value, buf)));
+ break;
+ case client::KuduColumnSchema::UNIXTIME_MICROS:
+ CHECK_OK(row->SetUnixTimeMicros(col_idx, value));
+ break;
+ case client::KuduColumnSchema::DECIMAL:
+ CHECK_OK(row->SetUnscaledDecimal(col_idx, value));
+ break;
default:
LOG(FATAL) << "Unexpected data type: " << type;
}
diff --git a/src/kudu/tools/kudu-admin-test.cc
b/src/kudu/tools/kudu-admin-test.cc
index a5c8b0c..d35e4c2 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -89,7 +89,6 @@ using kudu::client::KuduInsert;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduTable;
-using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 5f6ab9b..871b2fa 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -24,6 +24,7 @@
#include <iterator>
#include <map>
#include <memory>
+#include <set>
#include <sstream>
#include <string>
#include <tuple> // IWYU pragma: keep
@@ -47,6 +48,7 @@
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
+#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
@@ -141,6 +143,7 @@ using kudu::cfile::StringDataGenerator;
using kudu::cfile::WriterOptions;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnStorageAttributes;
using kudu::client::KuduInsert;
using kudu::client::KuduScanToken;
using kudu::client::KuduScanTokenBuilder;
@@ -148,6 +151,8 @@ using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
@@ -188,6 +193,7 @@ using std::map;
using std::max;
using std::ostringstream;
using std::pair;
+using std::set;
using std::string;
using std::to_string;
using std::unique_ptr;
@@ -442,6 +448,146 @@ class ToolTest : public KuduTest {
}
}
+ enum class TableCopyMode {
+ INSERT_TO_EXIST_TABLE = 0,
+ INSERT_TO_NOT_EXIST_TABLE = 1,
+ UPSERT_TO_EXIST_TABLE = 2,
+ COPY_SCHEMA_ONLY = 3,
+ };
+
+ struct RunCopyTableCheckArgs {
+ string src_table_name;
+ string predicates_json;
+ int64_t min_value;
+ int64_t max_value;
+ string columns;
+ TableCopyMode mode;
+ };
+
+ void RunCopyTableCheck(const RunCopyTableCheckArgs& args) {
+ const string kDstTableName = "kudu.table.copy.to";
+
+ // Prepare command flags, create destination table and write some data if
needed.
+ string write_type;
+ string create_table;
+ TestWorkload ww(cluster_.get());
+ ww.set_table_name(kDstTableName);
+ ww.set_num_replicas(1);
+ switch (args.mode) {
+ case TableCopyMode::INSERT_TO_EXIST_TABLE:
+ write_type = "insert";
+ create_table = "false";
+ // Create the dst table.
+ ww.set_num_write_threads(0);
+ ww.Setup();
+ break;
+ case TableCopyMode::INSERT_TO_NOT_EXIST_TABLE:
+ write_type = "insert";
+ create_table = "true";
+ break;
+ case TableCopyMode::UPSERT_TO_EXIST_TABLE:
+ write_type = "upsert";
+ create_table = "false";
+ // Create the dst table and write some data to it.
+ ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ ww.set_num_write_threads(1);
+ ww.set_write_batch_size(1);
+ ww.Setup();
+ ww.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(ww.rows_inserted(), 100);
+ });
+ ww.StopAndJoin();
+ break;
+ case TableCopyMode::COPY_SCHEMA_ONLY:
+ write_type = "";
+ create_table = "true";
+ break;
+ default:
+ ASSERT_TRUE(false);
+ }
+
+ // Execute copy command.
+ string stdout;
+ NO_FATALS(RunActionStdoutString(
+ Substitute("table copy $0 $1 $2 -dst_table=$3 -predicates=$4
-write_type=$5 "
+ "-create_table=$6",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ args.src_table_name,
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kDstTableName,
+ args.predicates_json,
+ write_type,
+ create_table),
+ &stdout));
+
+ // Check total count.
+ int64_t total = max(args.max_value - args.min_value + 1, 0L);
+ if (args.mode == TableCopyMode::COPY_SCHEMA_ONLY) {
+ ASSERT_STR_NOT_CONTAINS(stdout, "Total count ");
+ } else {
+ ASSERT_STR_CONTAINS(stdout, Substitute("Total count $0 ", total));
+ }
+
+ // Check schema equals when destination table is created automatically.
+ if (args.mode == TableCopyMode::INSERT_TO_NOT_EXIST_TABLE ||
+ args.mode == TableCopyMode::COPY_SCHEMA_ONLY) {
+ vector<string> src_schema;
+ NO_FATALS(RunActionStdoutLines(
+ Substitute("table describe $0 $1 -show_attributes=true",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ args.src_table_name), &src_schema));
+
+ vector<string> dst_schema;
+ NO_FATALS(RunActionStdoutLines(
+ Substitute("table describe $0 $1 -show_attributes=true",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kDstTableName), &dst_schema));
+
+ // Remove the first lines, which are the different table names.
+ src_schema.erase(src_schema.begin());
+ dst_schema.erase(dst_schema.begin());
+ ASSERT_EQ(src_schema, dst_schema);
+ }
+
+ // Check all values.
+ {
+ vector<string> src_lines;
+ NO_FATALS(RunActionStdoutLines(
+ Substitute("table scan $0 $1 -show_values=true "
+ "-columns=$2 -predicates=$3 -num_threads=1",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ args.src_table_name, args.columns, args.predicates_json),
&src_lines));
+
+ vector<string> dst_lines;
+ NO_FATALS(RunActionStdoutLines(
+ Substitute("table scan $0 $1 -show_values=true "
+ "-columns=$2 -num_threads=1",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kDstTableName, args.columns), &dst_lines));
+
+ if (args.mode == TableCopyMode::COPY_SCHEMA_ONLY) {
+ ASSERT_GT(dst_lines.size(), 1);
+ ASSERT_STR_CONTAINS(*dst_lines.rbegin(), "Total count 0 ");
+ } else {
+ set<string> sorted_dst_lines(dst_lines.begin(), dst_lines.end());
+ for (auto src_line = src_lines.begin(); src_line != src_lines.end();) {
+ if (src_line->find("key") != string::npos) {
+ ASSERT_TRUE(ContainsKey(sorted_dst_lines, *src_line));
+ sorted_dst_lines.erase(*src_line);
+ }
+ src_line = src_lines.erase(src_line);
+ }
+ for (const auto &dst_line : sorted_dst_lines) {
+ ASSERT_STR_NOT_CONTAINS(dst_line, "key");
+ }
+ }
+ }
+
+ // Drop dst table.
+ ww.Cleanup();
+ }
+
protected:
void RunLoadgen(int num_tservers = 1,
const vector<string>& tool_args = {},
@@ -467,6 +613,248 @@ class ToolTestKerberosParameterized : public ToolTest,
public ::testing::WithPar
INSTANTIATE_TEST_CASE_P(ToolTestKerberosParameterized,
ToolTestKerberosParameterized,
::testing::Values(false, true));
+enum RunCopyTableCheckArgsType {
+ kTestCopyTableDstTableExist,
+ kTestCopyTableDstTableNotExist,
+ kTestCopyTableUpsert,
+ kTestCopyTableSchemaOnly,
+ kTestCopyTableComplexSchema,
+ kTestCopyTablePredicates
+};
+// Subclass of ToolTest that allows running individual test cases with
different parameters to run
+// 'kudu table copy' CLI tool.
+class ToolTestCopyTableParameterized :
+ public ToolTest,
+ public ::testing::WithParamInterface<int> {
+ public:
+ void SetUp() override {
+ test_case_ = GetParam();
+ NO_FATALS(StartExternalMiniCluster());
+
+ // Create the src table and write some data to it.
+ TestWorkload ww(cluster_.get());
+ ww.set_table_name(kTableName);
+ ww.set_num_replicas(1);
+ ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ ww.set_num_write_threads(1);
+ // Create a complex schema if needed, or use a default simple schema.
+ if (test_case_ == kTestCopyTableComplexSchema) {
+ KuduSchema schema;
+ ASSERT_OK(CreateComplexSchema(&schema));
+ ww.set_schema(schema);
+ }
+ ww.Setup();
+ ww.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(ww.rows_inserted(), 100);
+ });
+ ww.StopAndJoin();
+ total_rows_ = ww.rows_inserted();
+
+ // Insert one more row with a NULL cell if needed.
+ if (test_case_ == kTestCopyTablePredicates) {
+ InsertOneRowWithNullCell();
+ }
+ }
+
+ std::vector<RunCopyTableCheckArgs> GenerateArgs() {
+ RunCopyTableCheckArgs args = { kTableName,
+ "",
+ 1,
+ total_rows_,
+ kSimpleSchemaColumns,
+ TableCopyMode::INSERT_TO_EXIST_TABLE };
+ switch (test_case_) {
+ case kTestCopyTableDstTableExist:
+ return { args };
+ case kTestCopyTableDstTableNotExist:
+ args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+ return { args };
+ case kTestCopyTableUpsert:
+ args.mode = TableCopyMode::UPSERT_TO_EXIST_TABLE;
+ return { args };
+ case kTestCopyTableSchemaOnly:
+ args.mode = TableCopyMode::COPY_SCHEMA_ONLY;
+ return { args };
+ case kTestCopyTableComplexSchema:
+ args.columns = kComplexSchemaColumns;
+ args.mode = TableCopyMode::INSERT_TO_NOT_EXIST_TABLE;
+ return { args };
+ case kTestCopyTablePredicates: {
+ auto mid = total_rows_ / 2;
+ std::vector<RunCopyTableCheckArgs> multi_args;
+ {
+ auto args_temp = args;
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.max_value = 1;
+ args_temp.predicates_json = R"*(["AND",["=","key",1]])*";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.min_value = mid + 1;
+ args_temp.predicates_json =
Substitute(R"*(["AND",[">","key",$0]])*", mid);
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.min_value = mid;
+ args_temp.predicates_json =
Substitute(R"*(["AND",[">=","key",$0]])*", mid);
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.max_value = mid - 1;
+ args_temp.predicates_json =
Substitute(R"*(["AND",["<","key",$0]])*", mid);
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.max_value = mid;
+ args_temp.predicates_json =
Substitute(R"*(["AND",["<=","key",$0]])*", mid);
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.max_value = 5;
+ args_temp.predicates_json = R"*(["AND",["IN","key",[1,2,3,4,5]]])*";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.max_value = total_rows_ - 1;
+ args_temp.predicates_json = R"*(["AND",["NOTNULL","string_val"]])*";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.min_value = total_rows_;
+ args_temp.predicates_json = R"*(["AND",["NULL","string_val"]])*";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ {
+ auto args_temp = args;
+ args_temp.max_value = 3;
+ args_temp.predicates_json = R"*(["AND",["IN","key",[0,1,2,3]],)*"
+
R"*(["<","key",8],[">=","key",1],["NOTNULL","key"],)*"
+ R"*(["NOTNULL","string_val"]])*";
+ multi_args.emplace_back(std::move(args_temp));
+ }
+ return multi_args;
+ }
+ default:
+ LOG(FATAL) << "Unknown type " << test_case_;
+ }
+ return {};
+ }
+
+ private:
+ Status CreateComplexSchema(KuduSchema* schema) {
+ shared_ptr<KuduClient> client;
+ RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
+ // Build the schema.
+ {
+ KuduSchemaBuilder builder;
+
builder.AddColumn("key_hash0")->Type(client::KuduColumnSchema::INT32)->NotNull();
+
builder.AddColumn("key_hash1")->Type(client::KuduColumnSchema::INT32)->NotNull();
+
builder.AddColumn("key_hash2")->Type(client::KuduColumnSchema::INT32)->NotNull();
+
builder.AddColumn("key_range")->Type(client::KuduColumnSchema::INT32)->NotNull();
+ builder.AddColumn("int8_val")->Type(client::KuduColumnSchema::INT8)
+
->Compression(KuduColumnStorageAttributes::CompressionType::NO_COMPRESSION)
+ ->Encoding(KuduColumnStorageAttributes::EncodingType::PLAIN_ENCODING);
+ builder.AddColumn("int16_val")->Type(client::KuduColumnSchema::INT16)
+ ->Compression(KuduColumnStorageAttributes::CompressionType::SNAPPY)
+ ->Encoding(KuduColumnStorageAttributes::EncodingType::RLE);
+ builder.AddColumn("int32_val")->Type(client::KuduColumnSchema::INT32)
+ ->Compression(KuduColumnStorageAttributes::CompressionType::LZ4)
+ ->Encoding(KuduColumnStorageAttributes::EncodingType::BIT_SHUFFLE);
+ builder.AddColumn("int64_val")->Type(client::KuduColumnSchema::INT64)
+ ->Compression(KuduColumnStorageAttributes::CompressionType::ZLIB)
+ ->Default(KuduValue::FromInt(123));
+
builder.AddColumn("timestamp_val")->Type(client::KuduColumnSchema::UNIXTIME_MICROS);
+ builder.AddColumn("string_val")->Type(client::KuduColumnSchema::STRING)
+ ->Encoding(KuduColumnStorageAttributes::EncodingType::PREFIX_ENCODING)
+ ->Default(KuduValue::CopyString(Slice("hello")));;
+ builder.AddColumn("bool_val")->Type(client::KuduColumnSchema::BOOL)
+ ->Default(KuduValue::FromBool(false));
+ builder.AddColumn("float_val")->Type(client::KuduColumnSchema::FLOAT);
+ builder.AddColumn("double_val")->Type(client::KuduColumnSchema::DOUBLE)
+ ->Default(KuduValue::FromDouble(123.4));
+ builder.AddColumn("binary_val")->Type(client::KuduColumnSchema::BINARY)
+ ->Encoding(KuduColumnStorageAttributes::EncodingType::DICT_ENCODING);
+ builder.AddColumn("decimal_val")->Type(client::KuduColumnSchema::DECIMAL)
+ ->Precision(30)
+ ->Scale(4);
+ builder.SetPrimaryKey({"key_hash0", "key_hash1", "key_hash2",
"key_range"});
+ RETURN_NOT_OK(builder.Build(schema));
+ }
+
+ // Set up partitioning and create the table.
+ {
+ unique_ptr<KuduPartialRow> bound0(schema->NewRow());
+ RETURN_NOT_OK(bound0->SetInt32("key_range", 0));
+ unique_ptr<KuduPartialRow> bound1(schema->NewRow());
+ RETURN_NOT_OK(bound1->SetInt32("key_range", 1));
+ unique_ptr<KuduPartialRow> bound2(schema->NewRow());
+ RETURN_NOT_OK(bound2->SetInt32("key_range", 2));
+ unique_ptr<KuduPartialRow> bound3(schema->NewRow());
+ RETURN_NOT_OK(bound3->SetInt32("key_range", 3));
+ unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+ RETURN_NOT_OK(table_creator->table_name(kTableName)
+ .schema(schema)
+ .add_hash_partitions({"key_hash0"}, 2)
+ .add_hash_partitions({"key_hash1", "key_hash2"}, 3)
+ .set_range_partition_columns({"key_range"})
+ .add_range_partition_split(bound0.release())
+ .add_range_partition_split(bound1.release())
+ .add_range_partition_split(bound2.release())
+ .add_range_partition_split(bound3.release())
+ .num_replicas(1)
+ .Create());
+ }
+
+ return Status::OK();
+ }
+
+ void InsertOneRowWithNullCell() {
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+ shared_ptr<KuduSession> session = client->NewSession();
+ session->SetTimeoutMillis(20000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(kTableName, &table));
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ ASSERT_OK(insert->mutable_row()->SetInt32("key", ++total_rows_));
+ ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 1));
+ ASSERT_OK(session->Apply(insert.release()));
+ ASSERT_OK(session->Flush());
+ }
+
+ static const char kTableName[];
+ static const char kSimpleSchemaColumns[];
+ static const char kComplexSchemaColumns[];
+ int test_case_ = 0;
+ int64_t total_rows_ = 0;
+};
+const char ToolTestCopyTableParameterized::kTableName[] =
"ToolTestCopyTableParameterized";
+const char ToolTestCopyTableParameterized::kSimpleSchemaColumns[] =
"key,int_val,string_val";
+const char ToolTestCopyTableParameterized::kComplexSchemaColumns[]
+ =
"key_hash0,key_hash1,key_hash2,key_range,int8_val,int16_val,int32_val,int64_val,"
+
"timestamp_val,string_val,bool_val,float_val,double_val,binary_val,decimal_val";
+
+INSTANTIATE_TEST_CASE_P(CopyTableParameterized,
+ ToolTestCopyTableParameterized,
+ ::testing::Values(kTestCopyTableDstTableExist,
+ kTestCopyTableDstTableNotExist,
+ kTestCopyTableUpsert,
+ kTestCopyTableSchemaOnly,
+ kTestCopyTableComplexSchema,
+ kTestCopyTablePredicates));
+
void ToolTest::StartExternalMiniCluster(ExternalMiniClusterOptions opts) {
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
ASSERT_OK(cluster_->Start());
@@ -656,6 +1044,7 @@ TEST_F(ToolTest, TestModeHelp) {
"rename_column.*Rename a column",
"list.*List tables",
"scan.*Scan rows from a table",
+ "copy.*Copy table data to another table",
};
NO_FATALS(RunTestHelp("table", kTableModeRegexes));
}
@@ -2348,6 +2737,7 @@ TEST_F(ToolTest, TestMasterList) {
// (3)rename a column
// (4)list tables
// (5)scan a table
+// (6)copy a table
TEST_F(ToolTest, TestDeleteTable) {
NO_FATALS(StartExternalMiniCluster());
shared_ptr<KuduClient> client;
@@ -2672,6 +3062,12 @@ TEST_F(ToolTest, TestScanTableMultiPredicates) {
ASSERT_LE(lines.size(), mid);
}
+TEST_P(ToolTestCopyTableParameterized, TestCopyTable) {
+ for (const auto& arg : GenerateArgs()) {
+ NO_FATALS(RunCopyTableCheck(arg));
+ }
+}
+
Status CreateLegacyHmsTable(HmsClient* client,
const string& hms_database_name,
const string& hms_table_name,
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
index 8d46f5b..9ed6e16 100644
--- a/src/kudu/tools/table_scanner.cc
+++ b/src/kudu/tools/table_scanner.cc
@@ -17,13 +17,17 @@
#include "kudu/tools/table_scanner.h"
-#include <stddef.h>
-
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
#include <iostream>
+#include <iterator>
#include <map>
#include <memory>
#include <set>
+#include <boost/algorithm/string/predicate.hpp>
#include <boost/bind.hpp>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
@@ -36,28 +40,40 @@
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/value.h"
+#include "kudu/client/write_op.h"
#include "kudu/common/column_predicate.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/partition.h"
+#include "kudu/common/row.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/bitmap.h"
#include "kudu/util/jsonreader.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/string_case.h"
+using kudu::client::KuduClient;
using kudu::client::KuduColumnSchema;
+using kudu::client::KuduError;
using kudu::client::KuduPredicate;
using kudu::client::KuduScanBatch;
using kudu::client::KuduScanner;
using kudu::client::KuduScanToken;
using kudu::client::KuduScanTokenBuilder;
using kudu::client::KuduSchema;
+using kudu::client::KuduSession;
using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
using kudu::client::KuduValue;
-using strings::Substitute;
+using kudu::client::KuduWriteOperation;
using std::endl;
using std::map;
using std::ostream;
@@ -66,12 +82,14 @@ using std::set;
using std::string;
using std::unique_ptr;
using std::vector;
+using strings::Substitute;
+DEFINE_bool(create_table, true,
+ "Whether to create the destination table if it doesn't exist.");
DECLARE_string(columns);
DEFINE_bool(fill_cache, true,
"Whether to fill block cache when scanning.");
DECLARE_int32(num_threads);
-
DEFINE_string(predicates, "",
"Query predicates on columns. Unlike traditional SQL syntax, "
"the scan tool's simple query predicates are represented in a "
@@ -94,6 +112,31 @@ DEFINE_string(predicates, "",
DEFINE_bool(show_values, false,
"Whether to show values of scanned rows.");
DECLARE_string(tablets);
+DEFINE_string(write_type, "insert",
+ "How data should be copied to the destination table. Valid
values are 'insert', "
+ "'upsert' or the empty string. If the empty string, data will
not be copied "
+ "(useful when create_table is 'true').");
+
+static bool ValidateWriteType(const char* flag_name,
+ const string& flag_value) {
+ const vector<string> allowed_values = { "insert", "upsert", "" };
+ if (std::find_if(allowed_values.begin(), allowed_values.end(),
+ [&](const string& allowed_value) {
+ return boost::iequals(allowed_value, flag_value);
+ }) != allowed_values.end()) {
+ return true;
+ }
+
+ std::ostringstream ss;
+ ss << "'" << flag_value << "': unsupported value for --" << flag_name
+ << " flag; should be one of ";
+ copy(allowed_values.begin(), allowed_values.end(),
+ std::ostream_iterator<string>(ss, " "));
+ LOG(ERROR) << ss.str();
+
+ return false;
+}
+DEFINE_validator(write_type, &ValidateWriteType);
namespace kudu {
namespace tools {
@@ -158,17 +201,17 @@ KuduPredicate* NewComparisonPredicate(const
client::sp::shared_ptr<KuduTable>& t
const rapidjson::Value* value) {
KuduValue* kudu_value = ParseValue(type, value);
CHECK(kudu_value != nullptr);
- client::KuduPredicate::ComparisonOp cop;
+ KuduPredicate::ComparisonOp cop;
if (predicate_type == "<") {
- cop = client::KuduPredicate::ComparisonOp::LESS;
+ cop = KuduPredicate::ComparisonOp::LESS;
} else if (predicate_type == "<=") {
- cop = client::KuduPredicate::ComparisonOp::LESS_EQUAL;
+ cop = KuduPredicate::ComparisonOp::LESS_EQUAL;
} else if (predicate_type == "=") {
- cop = client::KuduPredicate::ComparisonOp::EQUAL;
+ cop = KuduPredicate::ComparisonOp::EQUAL;
} else if (predicate_type == ">") {
- cop = client::KuduPredicate::ComparisonOp::GREATER;
+ cop = KuduPredicate::ComparisonOp::GREATER;
} else if (predicate_type == ">=") {
- cop = client::KuduPredicate::ComparisonOp::GREATER_EQUAL;
+ cop = KuduPredicate::ComparisonOp::GREATER_EQUAL;
} else {
return nullptr;
}
@@ -188,10 +231,10 @@ KuduPredicate* NewIsNullPredicate(const
client::sp::shared_ptr<KuduTable>& table
}
}
-KuduPredicate* NewInListPredicate(const client::sp::shared_ptr<KuduTable>
&table,
+KuduPredicate* NewInListPredicate(const client::sp::shared_ptr<KuduTable>&
table,
KuduColumnSchema::DataType type,
- const string &name,
- const JsonReader &reader,
+ const string& name,
+ const JsonReader& reader,
const rapidjson::Value *object) {
CHECK(object->IsArray());
vector<const rapidjson::Value*> values;
@@ -291,13 +334,144 @@ Status AddPredicates(const
client::sp::shared_ptr<KuduTable>& table,
return Status::OK();
}
-void TableScanner::ScannerTask(const vector<KuduScanToken *>& tokens) {
+Status CreateDstTableIfNeeded(const client::sp::shared_ptr<KuduTable>&
src_table,
+ const client::sp::shared_ptr<KuduClient>&
dst_client,
+ const string& dst_table_name) {
+ client::sp::shared_ptr<KuduTable> dst_table;
+ Status s = dst_client->OpenTable(dst_table_name, &dst_table);
+ if (!s.IsNotFound() && !s.ok()) {
+ return s;
+ }
+
+ // Destination table exists.
+ const KuduSchema& src_table_schema = src_table->schema();
+ if (s.ok()) {
+ if (src_table->id() == dst_table->id()) {
+ return Status::AlreadyPresent("Destination table is the same as the
source table.");
+ }
+
+ const KuduSchema& dst_table_schema = dst_table->schema();
+ if (!src_table_schema.Equals(dst_table_schema)) {
+ return Status::NotSupported(
+ "Not support different schema of source table and destination
table.");
+ }
+
+ return Status::OK();
+ }
+
+ // Destination table does NOT exist.
+ if (!FLAGS_create_table) {
+ return Status::NotFound(Substitute("Table $0 does not exist in the
destination cluster.",
+ dst_table_name));
+ }
+
+ Schema schema_internal = KuduSchema::ToSchema(src_table_schema);
+ // Convert Schema to KuduSchema will drop internal ColumnIds.
+ KuduSchema dst_table_schema = KuduSchema::FromSchema(schema_internal);
+ const auto& partition_schema = src_table->partition_schema();
+
+ auto convert_column_ids_to_names = [&schema_internal] (const
vector<ColumnId>& column_ids) {
+ vector<string> column_names;
+ column_names.reserve(column_ids.size());
+ for (const auto& column_id : column_ids) {
+
column_names.emplace_back(schema_internal.column_by_id(column_id).name());
+ }
+ return column_names;
+ };
+
+ // Table schema and replica number.
+ gscoped_ptr<KuduTableCreator> table_creator(dst_client->NewTableCreator());
+ table_creator->table_name(dst_table_name)
+ .schema(&dst_table_schema)
+ .num_replicas(src_table->num_replicas());
+
+ // Add hash partition schemas.
+ for (const auto& hash_partition_schema :
partition_schema.hash_partition_schemas()) {
+ auto hash_columns =
convert_column_ids_to_names(hash_partition_schema.column_ids);
+ table_creator->add_hash_partitions(hash_columns,
+ hash_partition_schema.num_buckets,
+ hash_partition_schema.seed);
+ }
+
+ // Add range partition schema.
+ if (!partition_schema.range_partition_schema().column_ids.empty()) {
+ auto range_columns
+ =
convert_column_ids_to_names(partition_schema.range_partition_schema().column_ids);
+ table_creator->set_range_partition_columns(range_columns);
+ }
+
+ // Add range bounds for each range partition.
+ vector<Partition> partitions;
+ RETURN_NOT_OK(src_table->ListPartitions(&partitions));
+ for (const auto& partition : partitions) {
+ // Deduplicate by hash bucket to get a unique entry per range partition.
+ const auto& hash_buckets = partition.hash_buckets();
+ if (!std::all_of(hash_buckets.begin(),
+ hash_buckets.end(),
+ [](int32_t bucket) { return bucket == 0; })) {
+ continue;
+ }
+
+ // Partitions are considered metadata, so don't redact them.
+ ScopedDisableRedaction no_redaction;
+
+ Arena arena(256);
+ std::unique_ptr<KuduPartialRow> lower(new
KuduPartialRow(&schema_internal));
+ std::unique_ptr<KuduPartialRow> upper(new
KuduPartialRow(&schema_internal));
+ Slice range_key_start = partition.range_key_start();
+ Slice range_key_end = partition.range_key_end();
+ RETURN_NOT_OK(partition_schema.DecodeRangeKey(&range_key_start,
lower.get(), &arena));
+ RETURN_NOT_OK(partition_schema.DecodeRangeKey(&range_key_end, upper.get(),
&arena));
+
+ table_creator->add_range_partition(lower.release(), upper.release());
+ }
+
+ // Create table.
+ RETURN_NOT_OK(table_creator->Create());
+ LOG(INFO) << "Table " << dst_table_name << " created successfully";
+
+ return Status::OK();
+}
+
+void CheckPendingErrors(const client::sp::shared_ptr<KuduSession>& session) {
+ vector<KuduError*> errors;
+ ElementDeleter d(&errors);
+ session->GetPendingErrors(&errors, nullptr);
+ for (const auto& error : errors) {
+ LOG(ERROR) << error->status().ToString();
+ }
+}
+
+Status TableScanner::AddRow(const client::sp::shared_ptr<KuduTable>& table,
+ const KuduSchema& table_schema,
+ const KuduScanBatch::RowPtr& src_row,
+ const client::sp::shared_ptr<KuduSession>&
session) {
+ unique_ptr<KuduWriteOperation> write_op;
+ if (FLAGS_write_type == "insert") {
+ write_op.reset(table->NewInsert());
+ } else if (FLAGS_write_type == "upsert") {
+ write_op.reset(table->NewUpsert());
+ } else {
+ LOG(FATAL) << Substitute("invalid write_type: $0", FLAGS_write_type);
+ }
+
+ KuduPartialRow* dst_row = write_op->mutable_row();
+ size_t row_size = ContiguousRowHelper::row_size(*src_row.schema_);
+ memcpy(dst_row->row_data_, src_row.row_data_, row_size);
+ BitmapChangeBits(dst_row->isset_bitmap_, 0, table_schema.num_columns(),
true);
+
+ return session->Apply(write_op.release());
+}
+
+void TableScanner::ScanData(const std::vector<kudu::client::KuduScanToken*>&
tokens,
+ const std::function<void(const KuduScanBatch&
batch)>& cb) {
for (auto token : tokens) {
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
- KuduScanner* scanner;
- CHECK_OK(token->IntoKuduScanner(&scanner));
+ KuduScanner* scanner_ptr;
+ CHECK_OK(token->IntoKuduScanner(&scanner_ptr));
+ unique_ptr<KuduScanner> scanner(scanner_ptr);
CHECK_OK(scanner->Open());
uint64_t count = 0;
@@ -306,14 +480,8 @@ void TableScanner::ScannerTask(const vector<KuduScanToken
*>& tokens) {
CHECK_OK(scanner->NextBatch(&batch));
count += batch.NumRows();
total_count_.IncrementBy(batch.NumRows());
- if (out_ && FLAGS_show_values) {
- MutexLock l(output_lock_);
- for (const auto& row : batch) {
- *out_ << row.ToString() << endl;
- }
- }
+ cb(batch);
}
- delete scanner;
sw.stop();
if (out_) {
@@ -324,11 +492,44 @@ void TableScanner::ScannerTask(const vector<KuduScanToken
*>& tokens) {
}
}
+void TableScanner::ScanTask(const vector<KuduScanToken *>& tokens) {
+ ScanData(tokens, [&](const KuduScanBatch& batch) {
+ if (out_ && FLAGS_show_values) {
+ MutexLock l(output_lock_);
+ for (const auto& row : batch) {
+ *out_ << row.ToString() << endl;
+ }
+ }
+ });
+}
+
+void TableScanner::CopyTask(const vector<KuduScanToken*>& tokens) {
+ client::sp::shared_ptr<KuduTable> dst_table;
+ CHECK_OK(dst_client_.get()->OpenTable(*dst_table_name_, &dst_table));
+ const KuduSchema& dst_table_schema = dst_table->schema();
+
+ // One session per thread.
+ client::sp::shared_ptr<KuduSession> session(dst_client_.get()->NewSession());
+ CHECK_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+ CHECK_OK(session->SetErrorBufferSpace(1024));
+ session->SetTimeoutMillis(30000);
+
+ ScanData(tokens, [&](const KuduScanBatch& batch) {
+ for (const auto& row : batch) {
+ CHECK_OK(AddRow(dst_table, dst_table_schema, row, session));
+ }
+ CheckPendingErrors(session);
+ // Flush here to make sure all write operations have been sent,
+ // and all strings reference to batch are still valid.
+ CHECK_OK(session->Flush());
+ });
+}
+
void TableScanner::MonitorTask() {
MonoTime last_log_time = MonoTime::Now();
while (thread_pool_->num_threads() > 1) { // Some other table scan thread
is running.
if (MonoTime::Now() - last_log_time >= MonoDelta::FromSeconds(5)) {
- LOG(INFO) << "Scanned count: " << total_count_.Load() << endl;
+ LOG(INFO) << "Scanned count: " << total_count_.Load();
last_log_time = MonoTime::Now();
}
SleepFor(MonoDelta::FromMilliseconds(100));
@@ -343,31 +544,46 @@ void TableScanner::SetReadMode(KuduScanner::ReadMode
mode) {
mode_ = mode;
}
-Status TableScanner::Run() {
- client::sp::shared_ptr<KuduTable> table;
- RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
+Status TableScanner::StartWork(WorkType type) {
+ client::sp::shared_ptr<KuduTable> src_table;
+ RETURN_NOT_OK(client_->OpenTable(table_name_, &src_table));
- KuduScanTokenBuilder builder(table.get());
+ // Create destination table if needed.
+ if (type == WorkType::kCopy) {
+ RETURN_NOT_OK(CreateDstTableIfNeeded(src_table, *dst_client_,
*dst_table_name_));
+ if (FLAGS_write_type.empty()) {
+ // Create table only.
+ return Status::OK();
+ }
+ }
+
+ KuduScanTokenBuilder builder(src_table.get());
RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache));
if (mode_) {
RETURN_NOT_OK(builder.SetReadMode(mode_.get()));
}
RETURN_NOT_OK(builder.SetTimeoutMillis(30000));
- vector<string> projected_column_names = Split(FLAGS_columns, ",",
strings::SkipEmpty());
- RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
- RETURN_NOT_OK(AddPredicates(table, builder));
+ // Set projection if needed.
+ if (type == WorkType::kScan) {
+ vector<string> projected_column_names = Split(FLAGS_columns, ",",
strings::SkipEmpty());
+ RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
+ }
+
+ // Set predicates.
+ RETURN_NOT_OK(AddPredicates(src_table, builder));
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
RETURN_NOT_OK(builder.Build(&tokens));
+ // Set tablet filter.
const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",",
strings::SkipWhitespace());
map<int, vector<KuduScanToken*>> thread_tokens;
int i = 0;
for (auto token : tokens) {
if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters,
token->tablet().id())) {
- thread_tokens[i++ % FLAGS_num_threads].push_back(token);
+ thread_tokens[i++ % FLAGS_num_threads].emplace_back(token);
}
}
@@ -379,8 +595,14 @@ Status TableScanner::Run() {
Stopwatch sw(Stopwatch::THIS_THREAD);
sw.start();
for (i = 0; i < FLAGS_num_threads; ++i) {
- RETURN_NOT_OK(thread_pool_->SubmitFunc(
- boost::bind(&TableScanner::ScannerTask, this, thread_tokens[i])));
+ if (type == WorkType::kScan) {
+ RETURN_NOT_OK(thread_pool_->SubmitFunc(
+ boost::bind(&TableScanner::ScanTask, this, thread_tokens[i])));
+ } else {
+ CHECK(type == WorkType::kCopy);
+ RETURN_NOT_OK(thread_pool_->SubmitFunc(
+ boost::bind(&TableScanner::CopyTask, this, thread_tokens[i])));
+ }
}
RETURN_NOT_OK(thread_pool_->SubmitFunc(boost::bind(&TableScanner::MonitorTask,
this)));
thread_pool_->Wait();
@@ -395,5 +617,16 @@ Status TableScanner::Run() {
return Status::OK();
}
+Status TableScanner::StartScan() {
+ return StartWork(WorkType::kScan);
+}
+
+Status TableScanner::StartCopy() {
+ CHECK(dst_client_);
+ CHECK(dst_table_name_);
+
+ return StartWork(WorkType::kCopy);
+}
+
} // namespace tools
} // namespace kudu
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
index 89c2742..7b3377b 100644
--- a/src/kudu/tools/table_scanner.h
+++ b/src/kudu/tools/table_scanner.h
@@ -18,6 +18,7 @@
#pragma once
#include <cstdint>
+#include <functional>
#include <iosfwd>
#include <string>
#include <utility>
@@ -26,6 +27,7 @@
#include <boost/optional/optional.hpp>
#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/util/atomic.h"
@@ -34,13 +36,26 @@
#include "kudu/util/threadpool.h"
namespace kudu {
+namespace client {
+class KuduSchema;
+} // namespace client
+} // namespace kudu
+
+namespace kudu {
namespace tools {
+// This class is not thread-safe.
class TableScanner {
public:
- TableScanner(client::sp::shared_ptr<kudu::client::KuduClient> client,
std::string table_name):
+ TableScanner(client::sp::shared_ptr<kudu::client::KuduClient> client,
+ std::string table_name,
+
boost::optional<client::sp::shared_ptr<kudu::client::KuduClient>> dst_client
+ = boost::none,
+ boost::optional<std::string> dst_table_name = boost::none):
total_count_(0),
client_(std::move(client)),
table_name_(std::move(table_name)),
+ dst_client_(std::move(dst_client)),
+ dst_table_name_(std::move(dst_table_name)),
out_(nullptr) {
}
@@ -51,20 +66,37 @@ class TableScanner {
// Set read mode, see KuduScanner::SetReadMode().
void SetReadMode(kudu::client::KuduScanner::ReadMode mode);
- Status Run();
+ Status StartScan();
+ Status StartCopy();
uint64_t TotalScannedCount() const {
return total_count_.Load();
}
private:
- void ScannerTask(const std::vector<kudu::client::KuduScanToken *>& tokens);
+ enum class WorkType {
+ kScan,
+ kCopy
+ };
+
+ Status StartWork(WorkType type);
+ void ScanData(const std::vector<kudu::client::KuduScanToken*>& tokens,
+ const std::function<void(const kudu::client::KuduScanBatch&
batch)>& cb);
+ void ScanTask(const std::vector<kudu::client::KuduScanToken*>& tokens);
+ void CopyTask(const std::vector<kudu::client::KuduScanToken*>& tokens);
void MonitorTask();
- boost::optional<kudu::client::KuduScanner::ReadMode> mode_;
+ Status AddRow(const client::sp::shared_ptr<kudu::client::KuduTable>& table,
+ const kudu::client::KuduSchema& table_schema,
+ const kudu::client::KuduScanBatch::RowPtr& src_row,
+ const client::sp::shared_ptr<kudu::client::KuduSession>&
session);
+
AtomicInt<uint64_t> total_count_;
+ boost::optional<kudu::client::KuduScanner::ReadMode> mode_;
client::sp::shared_ptr<kudu::client::KuduClient> client_;
std::string table_name_;
+ boost::optional<client::sp::shared_ptr<kudu::client::KuduClient>>
dst_client_;
+ boost::optional<std::string> dst_table_name_;
gscoped_ptr<ThreadPool> thread_pool_;
// Protects output to 'out_' so that rows don't get interleaved.
diff --git a/src/kudu/tools/tool_action_common.cc
b/src/kudu/tools/tool_action_common.cc
index 09ad010..7fdd41f 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -476,13 +476,19 @@ bool MatchesAnyPattern(const vector<string>& patterns,
const string& str) {
}
Status CreateKuduClient(const RunnerContext& context,
+ const char* master_addresses_arg,
client::sp::shared_ptr<KuduClient>* client) {
const string& master_addresses_str = FindOrDie(context.required_args,
- kMasterAddressesArg);
+ master_addresses_arg);
vector<string> master_addresses = Split(master_addresses_str, ",");
return KuduClientBuilder()
- .master_server_addrs(master_addresses)
- .Build(client);
+ .master_server_addrs(master_addresses)
+ .Build(client);
+}
+
+Status CreateKuduClient(const RunnerContext& context,
+ client::sp::shared_ptr<KuduClient>* client) {
+ return CreateKuduClient(context, kMasterAddressesArg, client);
}
Status PrintServerStatus(const string& address, uint16_t default_port) {
diff --git a/src/kudu/tools/tool_action_common.h
b/src/kudu/tools/tool_action_common.h
index 20be6ac..0682107 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -144,6 +144,12 @@ Status DumpMemTrackers(const std::string& address,
uint16_t default_port);
// 'patterns' is empty.
bool MatchesAnyPattern(const std::vector<std::string>& patterns, const
std::string& str);
+// Creates a Kudu client connected to the cluster whose master addresses are
specified by
+// 'master_addresses_arg'
+Status CreateKuduClient(const RunnerContext& context,
+ const char* master_addresses_arg,
+ client::sp::shared_ptr<client::KuduClient>* client);
+
// Creates a Kudu client connected to the cluster whose master addresses are
defined by
// the kMasterAddressesArg argument in 'context'.
Status CreateKuduClient(const RunnerContext& context,
diff --git a/src/kudu/tools/tool_action_perf.cc
b/src/kudu/tools/tool_action_perf.cc
index 49b83fa..85f96cf 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -574,7 +574,7 @@ Status CountTableRows(const shared_ptr<KuduClient>& client,
const string& table_name, uint64_t* count) {
TableScanner scanner(client, table_name);
scanner.SetReadMode(KuduScanner::ReadMode::READ_YOUR_WRITES);
- RETURN_NOT_OK(scanner.Run());
+ RETURN_NOT_OK(scanner.StartScan());
if (count != nullptr) {
*count = scanner.TotalScannedCount();
}
@@ -695,7 +695,7 @@ Status TableScan(const RunnerContext &context) {
FLAGS_show_values = false;
TableScanner scanner(client, table_name);
scanner.SetOutput(&cout);
- return scanner.Run();
+ return scanner.StartScan();
}
} // anonymous namespace
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 36ec755..5854858 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -23,6 +23,7 @@
#include <unordered_map>
#include <vector>
+#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <rapidjson/document.h>
@@ -72,6 +73,9 @@ DEFINE_bool(check_row_existence, false,
"the tablet. If found, the full row will be printed; if not found,
"
"an error message will be printed and the command will return a "
"non-zero status.");
+DEFINE_string(dst_table, "",
+ "The name of the destination table the data will be copied to. "
+ "If the empty string, use the same name as the source table.");
DEFINE_bool(list_tablets, false,
"Include tablet and replica UUIDs in the output");
DEFINE_bool(modify_external_catalogs, true,
@@ -134,6 +138,7 @@ const char* const kNewTableNameArg = "new_table_name";
const char* const kColumnNameArg = "column_name";
const char* const kNewColumnNameArg = "new_column_name";
const char* const kKeyArg = "primary_key";
+const char* const kDestMasterAddressesArg = "dest_master_addresses";
Status DeleteTable(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
@@ -391,7 +396,27 @@ Status ScanTable(const RunnerContext &context) {
FLAGS_show_values = true;
TableScanner scanner(client, table_name);
scanner.SetOutput(&cout);
- return scanner.Run();
+ return scanner.StartScan();
+}
+
+Status CopyTable(const RunnerContext& context) {
+ client::sp::shared_ptr<KuduClient> src_client;
+ RETURN_NOT_OK(CreateKuduClient(context, &src_client));
+ const string& src_table_name = FindOrDie(context.required_args,
kTableNameArg);
+
+ client::sp::shared_ptr<KuduClient> dst_client;
+ if (FindOrDie(context.required_args, kMasterAddressesArg)
+ == FindOrDie(context.required_args, kDestMasterAddressesArg)) {
+ dst_client = src_client;
+ } else {
+ RETURN_NOT_OK(CreateKuduClient(context, kDestMasterAddressesArg,
&dst_client));
+ }
+
+ const string& dst_table_name = FLAGS_dst_table.empty() ? src_table_name :
FLAGS_dst_table;
+
+ TableScanner scanner(src_client, src_table_name, dst_client, dst_table_name);
+ scanner.SetOutput(&cout);
+ return scanner.StartCopy();
}
} // anonymous namespace
@@ -470,6 +495,29 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("tablets")
.Build();
+ unique_ptr<Action> copy_table =
+ ActionBuilder("copy", &CopyTable)
+ .Description("Copy table data to another table")
+ .ExtraDescription("Copy table data to another table; the two tables
could be in the same "
+ "cluster or not. The two tables must have the same
table schema, but "
+ "could have different partition schemas.
Alternatively, the tool can "
+ "create the new table using the same table and
partition schema as the "
+ "source table.")
+ .AddRequiredParameter({ kMasterAddressesArg,
+ "Comma-separated list of Kudu Master addresses
(source) "
+ "where each address is of form 'hostname:port'"
})
+ .AddRequiredParameter({ kTableNameArg, "Name of the source table" })
+ .AddRequiredParameter({ kDestMasterAddressesArg,
+ "Comma-separated list of Kudu Master addresses
(destination) "
+ "where each address is of form 'hostname:port'"
})
+ .AddOptionalParameter("create_table")
+ .AddOptionalParameter("dst_table")
+ .AddOptionalParameter("num_threads")
+ .AddOptionalParameter("predicates")
+ .AddOptionalParameter("tablets")
+ .AddOptionalParameter("write_type")
+ .Build();
+
return ModeBuilder("table")
.Description("Operate on Kudu tables")
.AddAction(std::move(delete_table))
@@ -479,6 +527,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddAction(std::move(rename_column))
.AddAction(std::move(rename_table))
.AddAction(std::move(scan_table))
+ .AddAction(std::move(copy_table))
.Build();
}