This is an automated email from the ASF dual-hosted git repository. wdberkeley pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 0afeddf9e530762e0e47beb7428982763715c746 Author: Yingchun Lai <405403...@qq.com> AuthorDate: Sat Jan 5 03:41:50 2019 -0500 [tools] Add table scan tool This commit adds a basic tool to scan rows from a table. Several predicates can specified on the query. Unlike traditional SQL syntax, the scan tool's simple query predicates are represented in a simple JSON syntax. Three types of predicates are supported, including 'Comparison', 'InList' and 'IsNull'. * The 'Comparison' type support <=, <, ==, > and >=, which can be represented as '[operator, column_name, value]', e.g. '[">=", "col1", "value"]' * The 'InList' type can be represented as '["IN", column_name, [value1, value2, ...]]' e.g. '["IN", "col2", ["value1", "value2"]]' * The 'IsNull' type determine whether the value is NULL or not, which can be represented as '[operator, column_name]' e.g. '["NULL", "col1"]', or '["NOTNULL", "col2"]' Predicates can be combined together with predicate operators using the syntax [operator, predicate, predicate, ..., predicate]. For example, ["AND", [">=", "col1", "value"], ["NOTNULL", "col2"]] The only supported predicate operator is `AND`. Change-Id: Ieac340b70a9eaf131f82a2b7d61336211d1d48f8 Reviewed-on: http://gerrit.cloudera.org:8080/12167 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley <wdberke...@gmail.com> --- src/kudu/client/scanner-internal.h | 3 + src/kudu/tools/CMakeLists.txt | 1 + src/kudu/tools/kudu-tool-test.cc | 185 +++++++++++++++++ src/kudu/tools/table_scanner.cc | 380 ++++++++++++++++++++++++++++++++++ src/kudu/tools/table_scanner.h | 67 ++++++ src/kudu/tools/tool_action_cluster.cc | 4 +- src/kudu/tools/tool_action_common.cc | 6 + src/kudu/tools/tool_action_perf.cc | 4 +- src/kudu/tools/tool_action_table.cc | 73 +++++-- 9 files changed, 694 insertions(+), 29 deletions(-) diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h index 5e7652b..7b233bf 100644 --- a/src/kudu/client/scanner-internal.h +++ b/src/kudu/client/scanner-internal.h @@ -311,6 +311,9 @@ class KuduScanBatch::Data { << row_format_flags_; DCHECK_GE(idx, 0); DCHECK_LT(idx, num_rows()); + if (direct_data_.empty()) { + return KuduRowResult(projection_, nullptr); + } int offset = idx * projected_row_size_; return KuduRowResult(projection_, &direct_data_[offset]); } diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt index 1be02ca..ebe894c 100644 --- a/src/kudu/tools/CMakeLists.txt +++ b/src/kudu/tools/CMakeLists.txt @@ -42,6 +42,7 @@ add_library(kudu_tools_util color.cc data_gen_util.cc diagnostics_log_parser.cc + table_scanner.cc tool_action.cc tool_action_common.cc ) diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc index b8fd9d6..1c0633f 100644 --- a/src/kudu/tools/kudu-tool-test.cc +++ b/src/kudu/tools/kudu-tool-test.cc @@ -47,6 +47,7 @@ #include "kudu/client/client.h" #include "kudu/client/schema.h" #include "kudu/client/shared_ptr.h" +#include "kudu/client/write_op.h" #include "kudu/common/common.pb.h" #include "kudu/common/partial_row.h" #include "kudu/common/partition.h" @@ -140,10 +141,12 @@ using kudu::cfile::StringDataGenerator; using kudu::cfile::WriterOptions; using kudu::client::KuduClient; using kudu::client::KuduClientBuilder; +using kudu::client::KuduInsert; using kudu::client::KuduScanToken; using kudu::client::KuduScanTokenBuilder; using kudu::client::KuduSchema; using kudu::client::KuduSchemaBuilder; +using kudu::client::KuduSession; using kudu::client::KuduTable; using kudu::client::sp::shared_ptr; using kudu::cluster::ExternalMiniCluster; @@ -181,9 +184,11 @@ using std::back_inserter; using std::copy; using std::make_pair; using std::map; +using std::max; using std::ostringstream; using std::pair; using std::string; +using std::to_string; using std::unique_ptr; using std::unordered_map; using std::unordered_set; @@ -378,6 +383,41 @@ class ToolTest : public KuduTest { return Status::OK(); } + void RunScanTableCheck(const string& table_name, + const string& predicates_json, + int64_t min_value, + int64_t max_value, + const vector<pair<string, string>>& columns = {{"int32", "key"}}) { + vector<string> col_names; + for (const auto& column : columns) { + col_names.push_back(column.second); + } + const string projection = JoinStrings(col_names, ","); + + vector<string> lines; + int64_t total = max(max_value - min_value + 1, 0L); + NO_FATALS(RunActionStdoutLines( + Substitute("table scan $0 $1 -show_value=true " + "-columns=$2 -predicates=$3", + cluster_->master()->bound_rpc_addr().ToString(), + table_name, projection, predicates_json), &lines)); + for (int64_t value = min_value; value <= max_value; ++value) { + // Check projection. + vector<string> kvs; + for (const auto& column : columns) { + // Check matched rows. + kvs.push_back(Substitute("$0 $1=$2", + column.first, column.second, column.second == "key" ? to_string(value) : ".*")); + } + string line_pattern(R"*(\()*"); + line_pattern += JoinStrings(kvs, ", "); + line_pattern += (")"); + ASSERT_STRINGS_ANY_MATCH(lines, line_pattern); + } + // Check total count. + ASSERT_STRINGS_ANY_MATCH(lines, Substitute("Total count $0 ", total)); + } + protected: void RunLoadgen(int num_tservers = 1, const vector<string>& tool_args = {}, @@ -589,6 +629,7 @@ TEST_F(ToolTest, TestModeHelp) { "rename_table.*Rename a table", "rename_column.*Rename a column", "list.*List tables", + "scan.*Scan rows from a table", }; NO_FATALS(RunTestHelp("table", kTableModeRegexes)); } @@ -2130,6 +2171,7 @@ TEST_F(ToolTest, TestMasterList) { // (2)rename a table // (3)rename a column // (4)list tables +// (5)scan a table TEST_F(ToolTest, TestDeleteTable) { NO_FATALS(StartExternalMiniCluster()); shared_ptr<KuduClient> client; @@ -2311,6 +2353,149 @@ TEST_F(ToolTest, TestListTables) { } } +TEST_F(ToolTest, TestScanTablePredicates) { + NO_FATALS(StartExternalMiniCluster()); + string master_addr = cluster_->master()->bound_rpc_addr().ToString(); + + const string kTableName = "kudu.table.scan.predicates"; + + // Create the src table and write some data to it. + TestWorkload ww(cluster_.get()); + ww.set_table_name(kTableName); + ww.set_num_replicas(1); + ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); + ww.set_num_write_threads(1); + ww.Setup(); + ww.Start(); + ASSERT_EVENTUALLY([&]() { + ASSERT_GE(ww.rows_inserted(), 10); + }); + ww.StopAndJoin(); + int64_t total_rows = ww.rows_inserted(); + + // Insert one more row with a NULL value column. + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + shared_ptr<KuduSession> session = client->NewSession(); + session->SetTimeoutMillis(20000); + ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + shared_ptr<KuduTable> table; + ASSERT_OK(client->OpenTable(kTableName, &table)); + unique_ptr<KuduInsert> insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt32("key", ++total_rows)); + ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 1)); + ASSERT_OK(session->Apply(insert.release())); + ASSERT_OK(session->Flush()); + + // Check predicates. + RunScanTableCheck(kTableName, "", 1, total_rows); + RunScanTableCheck(kTableName, R"*(["AND",["=","key",1]])*", 1, 1); + int64_t mid = total_rows / 2; + RunScanTableCheck(kTableName, + Substitute(R"*(["AND",[">","key",$0]])*", mid), + mid + 1, total_rows); + RunScanTableCheck(kTableName, + Substitute(R"*(["AND",[">=","key",$0]])*", mid), + mid, total_rows); + RunScanTableCheck(kTableName, + Substitute(R"*(["AND",["<","key",$0]])*", mid), + 1, mid - 1); + RunScanTableCheck(kTableName, + Substitute(R"*(["AND",["<=","key",$0]])*", mid), + 1, mid); + RunScanTableCheck(kTableName, + R"*(["AND",["IN","key",[1,2,3,4,5]]])*", + 1, 5); + RunScanTableCheck(kTableName, + R"*(["AND",["NOTNULL","string_val"]])*", + 1, total_rows - 1); + RunScanTableCheck(kTableName, + R"*(["AND",["NULL","string_val"]])*", + total_rows, total_rows); + RunScanTableCheck(kTableName, + R"*(["AND",["IN","key",[0,1,2,3]],)*" + R"*(["<","key",8],[">=","key",1],["NOTNULL","key"],)*" + R"*(["NOTNULL","string_val"]])*", + 1, 3); +} + +TEST_F(ToolTest, TestScanTableProjection) { + NO_FATALS(StartExternalMiniCluster()); + string master_addr = cluster_->master()->bound_rpc_addr().ToString(); + + const string kTableName = "kudu.table.scan.projection"; + + // Create the src table and write some data to it. + TestWorkload ww(cluster_.get()); + ww.set_table_name(kTableName); + ww.set_num_replicas(1); + ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); + ww.set_num_write_threads(1); + ww.Setup(); + ww.Start(); + ASSERT_EVENTUALLY([&]() { + ASSERT_GE(ww.rows_inserted(), 10); + }); + ww.StopAndJoin(); + + // Check projections. + string one_row_json = R"*(["AND",["=","key",1]])*"; + RunScanTableCheck(kTableName, one_row_json, 1, 1, {}); + RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"}}); + RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"string", "string_val"}}); + RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"}, + {"string", "string_val"}}); + RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"}, + {"int32", "int_val"}, + {"string", "string_val"}}); +} + +TEST_F(ToolTest, TestScanTableMultiPredicates) { + NO_FATALS(StartExternalMiniCluster()); + string master_addr = cluster_->master()->bound_rpc_addr().ToString(); + + const string kTableName = "kudu.table.scan.multipredicates"; + + // Create the src table and write some data to it. + TestWorkload ww(cluster_.get()); + ww.set_table_name(kTableName); + ww.set_num_replicas(1); + ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS); + ww.set_num_write_threads(1); + ww.Setup(); + ww.Start(); + ASSERT_EVENTUALLY([&]() { + ASSERT_GE(ww.rows_inserted(), 1000); + }); + ww.StopAndJoin(); + int64_t total_rows = ww.rows_inserted(); + int64_t mid = total_rows / 2; + + vector<string> lines; + NO_FATALS(RunActionStdoutLines( + Substitute("table scan $0 $1 -show_value=true " + "-columns=key,string_val -predicates=$2", + cluster_->master()->bound_rpc_addr().ToString(), + kTableName, + Substitute(R"*(["AND",[">","key",$0],)*" + R"*(["<=","key",$1],)*" + R"*([">=","string_val","a"],)*" + R"*(["<","string_val","b"]])*", mid, total_rows)), + &lines)); + for (auto line : lines) { + size_t pos1 = line.find("(int64 key="); + if (pos1 != string::npos) { + size_t pos2 = line.find(", string string_val=a", pos1); + ASSERT_NE(pos2, string::npos); + int32_t key; + ASSERT_TRUE(safe_strto32(line.substr(pos1, pos2).c_str(), &key)); + ASSERT_GT(key, mid); + ASSERT_LE(key, total_rows); + } + } + ASSERT_LE(lines.size(), mid); +} + Status CreateLegacyHmsTable(HmsClient* client, const string& hms_database_name, const string& hms_table_name, diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc new file mode 100644 index 0000000..a2f2e7b --- /dev/null +++ b/src/kudu/tools/table_scanner.cc @@ -0,0 +1,380 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/tools/table_scanner.h" + +#include <stddef.h> + +#include <iostream> +#include <map> +#include <memory> +#include <set> + +#include <boost/bind.hpp> +#include <boost/optional/optional.hpp> +#include <gflags/gflags.h> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <rapidjson/document.h> + +#include "kudu/client/client.h" +#include "kudu/client/scan_batch.h" +#include "kudu/client/scan_predicate.h" +#include "kudu/client/schema.h" +#include "kudu/client/value.h" +#include "kudu/common/column_predicate.h" +#include "kudu/common/schema.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/jsonreader.h" +#include "kudu/util/monotime.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/string_case.h" + +using kudu::client::KuduColumnSchema; +using kudu::client::KuduPredicate; +using kudu::client::KuduScanBatch; +using kudu::client::KuduScanner; +using kudu::client::KuduScanTokenBuilder; +using kudu::client::KuduSchema; +using kudu::client::KuduTable; +using kudu::client::KuduValue; +using strings::Substitute; +using std::cout; +using std::endl; +using std::map; +using std::set; +using std::unique_ptr; + +DECLARE_string(columns); +DEFINE_bool(fill_cache, true, + "Whether to fill block cache when scanning."); +DECLARE_int32(num_threads); + +DEFINE_string(predicates, "", + "Query predicates on columns. Unlike traditional SQL syntax, " + "the scan tool's simple query predicates are represented in a " + "simple JSON syntax. Three types of predicates are supported, " + "including 'Comparison', 'InList' and 'IsNull'.\n" + " * The 'Comparison' type support <=, <, ==, > and >=,\n" + " which can be represented as '[operator, column_name, value]',""\n" + R"*( e.g. '[">=", "col1", "value"]')*""\n" + " * The 'InList' type can be represented as\n" + R"*( '["IN", column_name, [value1, value2, ...]]')*""\n" + R"*( e.g. '["IN", "col2", ["value1", "value2"]]')*""\n" + " * The 'IsNull' type determine whether the value is NULL or not,\n" + " which can be represented as '[operator, column_name]'\n" + R"*( e.g. '["NULL", "col1"]', or '["NOTNULL", "col2"]')*""\n" + "Predicates can be combined together with predicate operators using the syntax\n" + " [operator, predicate, predicate, ..., predicate].\n" + "For example,\n" + R"*( ["AND", [">=", "col1", "value"], ["NOTNULL", "col2"]])*""\n" + "The only supported predicate operator is `AND`."); +DEFINE_bool(show_value, false, + "Whether to show values of scanned rows."); +DECLARE_string(tablets); + +namespace kudu { +namespace tools { + +PredicateType ParsePredicateType(const string& predicate_type) { + string predicate_type_uc; + ToUpperCase(predicate_type, &predicate_type_uc); + if (predicate_type_uc == "=") { + return PredicateType::Equality; + } else if (predicate_type_uc == "<" || + predicate_type_uc == "<=" || + predicate_type_uc == ">" || + predicate_type_uc == ">=") { + return PredicateType::Range; + } else if (predicate_type_uc == "NULL") { + return PredicateType::IsNull; + } else if (predicate_type_uc == "NOTNULL") { + return PredicateType::IsNotNull; + } else if (predicate_type_uc == "IN") { + return PredicateType::InList; + } else { + LOG(FATAL) << Substitute("unhandled predicate type $0", predicate_type); + return PredicateType::None; + } +} + +KuduValue* ParseValue(KuduColumnSchema::DataType type, + const rapidjson::Value* value) { + CHECK(value != nullptr); + switch (type) { + case KuduColumnSchema::DataType::INT8: + case KuduColumnSchema::DataType::INT16: + case KuduColumnSchema::DataType::INT32: + CHECK(value->IsInt()); + return KuduValue::FromInt(value->GetInt()); + case KuduColumnSchema::DataType::INT64: + CHECK(value->IsInt64()); + return KuduValue::FromInt(value->GetInt64()); + case KuduColumnSchema::DataType::STRING: + CHECK(value->IsString()); + return KuduValue::CopyString(value->GetString()); + case KuduColumnSchema::DataType::BOOL: + CHECK(value->IsBool()); + return KuduValue::FromBool(value->GetBool()); + case KuduColumnSchema::DataType::FLOAT: + CHECK(value->IsDouble()); + return KuduValue::FromFloat(static_cast<float>(value->GetDouble())); + case KuduColumnSchema::DataType::DOUBLE: + CHECK(value->IsDouble()); + return KuduValue::FromDouble(value->GetDouble()); + default: + LOG(FATAL) << Substitute("unhandled data type $0", type); + } + + return nullptr; +} + +KuduPredicate* NewComparisonPredicate(const client::sp::shared_ptr<KuduTable>& table, + KuduColumnSchema::DataType type, + const string& predicate_type, + const string& column_name, + const rapidjson::Value* value) { + KuduValue* kudu_value = ParseValue(type, value); + CHECK(kudu_value != nullptr); + client::KuduPredicate::ComparisonOp cop; + if (predicate_type == "<") { + cop = client::KuduPredicate::ComparisonOp::LESS; + } else if (predicate_type == "<=") { + cop = client::KuduPredicate::ComparisonOp::LESS_EQUAL; + } else if (predicate_type == "=") { + cop = client::KuduPredicate::ComparisonOp::EQUAL; + } else if (predicate_type == ">") { + cop = client::KuduPredicate::ComparisonOp::GREATER; + } else if (predicate_type == ">=") { + cop = client::KuduPredicate::ComparisonOp::GREATER_EQUAL; + } else { + return nullptr; + } + return table->NewComparisonPredicate(column_name, cop, kudu_value); +} + +KuduPredicate* NewIsNullPredicate(const client::sp::shared_ptr<KuduTable>& table, + const string& column_name, + PredicateType pt) { + switch (pt) { + case PredicateType::IsNotNull: + return table->NewIsNotNullPredicate(column_name); + case PredicateType::IsNull: + return table->NewIsNullPredicate(column_name); + default: + return nullptr; + } +} + +KuduPredicate* NewInListPredicate(const client::sp::shared_ptr<KuduTable> &table, + KuduColumnSchema::DataType type, + const string &name, + const JsonReader &reader, + const rapidjson::Value *object) { + CHECK(object->IsArray()); + vector<const rapidjson::Value*> values; + reader.ExtractObjectArray(object, nullptr, &values); + vector<KuduValue *> kudu_values; + for (const auto& value : values) { + kudu_values.emplace_back(ParseValue(type, value)); + } + return table->NewInListPredicate(name, &kudu_values); +} + +Status AddPredicate(const client::sp::shared_ptr<KuduTable>& table, + const string& predicate_type, + const string& column_name, + const boost::optional<const rapidjson::Value*>& value, + const JsonReader& reader, + KuduScanTokenBuilder& builder) { + if (predicate_type.empty() || column_name.empty()) { + return Status::OK(); + } + + Schema schema_internal = KuduSchema::ToSchema(table->schema()); + int idx = schema_internal.find_column(column_name); + if (PREDICT_FALSE(idx == Schema::kColumnNotFound)) { + return Status::NotFound("no such column", column_name); + } + auto type = table->schema().Column(static_cast<size_t>(idx)).type(); + KuduPredicate* predicate = nullptr; + PredicateType pt = ParsePredicateType(predicate_type); + switch (pt) { + case PredicateType::Equality: + case PredicateType::Range: + CHECK(value); + predicate = NewComparisonPredicate(table, type, predicate_type, column_name, value.get()); + break; + case PredicateType::IsNotNull: + case PredicateType::IsNull: + CHECK(!value); + predicate = NewIsNullPredicate(table, column_name, pt); + break; + case PredicateType::InList: { + CHECK(value); + predicate = NewInListPredicate(table, type, column_name, reader, value.get()); + break; + } + default: + return Status::NotSupported(Substitute("not support predicate_type $0", predicate_type)); + } + CHECK(predicate); + RETURN_NOT_OK(builder.AddConjunctPredicate(predicate)); + + return Status::OK(); +} + +Status AddPredicates(const client::sp::shared_ptr<KuduTable>& table, + KuduScanTokenBuilder& builder) { + if (FLAGS_predicates.empty()) { + return Status::OK(); + } + JsonReader reader(FLAGS_predicates); + RETURN_NOT_OK(reader.Init()); + vector<const rapidjson::Value*> predicate_objects; + RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(), + nullptr, + &predicate_objects)); + vector<unique_ptr<KuduPredicate>> predicates; + for (int i = 0; i < predicate_objects.size(); ++i) { + if (i == 0) { + CHECK(predicate_objects[i]->IsString()); + string op; + ToUpperCase(predicate_objects[i]->GetString(), &op); + if (op != "AND") { + return Status::InvalidArgument(Substitute("only 'AND' operator is supported now")); + } + continue; + } + + CHECK(predicate_objects[i]->IsArray()); + vector<const rapidjson::Value*> elements; + reader.ExtractObjectArray(predicate_objects[i], nullptr, &elements); + if (elements.size() == 2 || elements.size() == 3) { + CHECK(elements[0]->IsString()); + CHECK(elements[1]->IsString()); + RETURN_NOT_OK(AddPredicate(table, + elements[0]->GetString(), + elements[1]->GetString(), + elements.size() == 2 ? + boost::none : boost::optional<const rapidjson::Value*>(elements[2]), + reader, + builder)); + } else { + return Status::InvalidArgument( + Substitute("invalid predicate elements count $0", elements.size())); + } + } + + return Status::OK(); +} + +void TableScanner::ScannerTask(const vector<KuduScanToken *>& tokens) { + for (auto token : tokens) { + Stopwatch sw(Stopwatch::THIS_THREAD); + sw.start(); + + KuduScanner* scanner; + CHECK_OK(token->IntoKuduScanner(&scanner)); + CHECK_OK(scanner->Open()); + + uint64_t count = 0; + while (scanner->HasMoreRows()) { + KuduScanBatch batch; + CHECK_OK(scanner->NextBatch(&batch)); + count += batch.NumRows(); + total_count_.IncrementBy(batch.NumRows()); + if (FLAGS_show_value) { + for (const auto& row : batch) { + cout << row.ToString() << endl; + } + } + } + delete scanner; + + sw.stop(); + cout << "T " << token->tablet().id() << " scanned count " << count + << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl; + } +} + +void TableScanner::MonitorTask() { + MonoTime last_log_time = MonoTime::Now(); + while (thread_pool_->num_threads() > 1) { // Some other table scan thread is running. + if (MonoTime::Now() - last_log_time >= MonoDelta::FromSeconds(5)) { + LOG(INFO) << "Scanned count: " << total_count_.Load() << endl; + last_log_time = MonoTime::Now(); + } + SleepFor(MonoDelta::FromMilliseconds(100)); + } +} + +Status TableScanner::Run() { + client::sp::shared_ptr<KuduTable> table; + RETURN_NOT_OK(client_->OpenTable(table_name_, &table)); + + KuduScanTokenBuilder builder(table.get()); + RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache)); + RETURN_NOT_OK(builder.SetSelection(KuduClient::LEADER_ONLY)); + RETURN_NOT_OK(builder.SetReadMode(KuduScanner::READ_LATEST)); + RETURN_NOT_OK(builder.SetTimeoutMillis(30000)); + + vector<string> projected_column_names = Split(FLAGS_columns, ",", strings::SkipEmpty()); + RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names)); + RETURN_NOT_OK(AddPredicates(table, builder)); + + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + RETURN_NOT_OK(builder.Build(&tokens)); + + const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace()); + map<int, vector<KuduScanToken*>> thread_tokens; + int i = 0; + for (auto token : tokens) { + if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) { + thread_tokens[i++ % FLAGS_num_threads].push_back(token); + } + } + + RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool") + .set_max_threads(FLAGS_num_threads + 1) // add extra 1 thread for MonitorTask + .set_idle_timeout(MonoDelta::FromMilliseconds(1)) + .Build(&thread_pool_)); + + Stopwatch sw(Stopwatch::THIS_THREAD); + sw.start(); + for (i = 0; i < FLAGS_num_threads; ++i) { + RETURN_NOT_OK(thread_pool_->SubmitFunc( + boost::bind(&TableScanner::ScannerTask, this, thread_tokens[i]))); + } + RETURN_NOT_OK(thread_pool_->SubmitFunc(boost::bind(&TableScanner::MonitorTask, this))); + thread_pool_->Wait(); + thread_pool_->Shutdown(); + + sw.stop(); + cout << "Total count " << total_count_.Load() + << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl; + + return Status::OK(); +} + +} // namespace tools +} // namespace kudu diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h new file mode 100644 index 0000000..c9589c5 --- /dev/null +++ b/src/kudu/tools/table_scanner.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <string> +#include <utility> +#include <vector> + +#include "kudu/client/shared_ptr.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/util/atomic.h" +#include "kudu/util/status.h" +#include "kudu/util/threadpool.h" + +namespace kudu { +namespace client { +class KuduClient; +class KuduScanToken; +} // namespace client +} // namespace kudu + +using kudu::client::KuduClient; +using kudu::client::KuduScanToken; +using std::string; +using std::vector; + +namespace kudu { +namespace tools { +class TableScanner { +public: + TableScanner(client::sp::shared_ptr<KuduClient> client, string table_name): + total_count_(0), + client_(std::move(client)), + table_name_(std::move(table_name)) { + } + + Status Run(); + +private: + void ScannerTask(const vector<KuduScanToken *>& tokens); + void MonitorTask(); + +private: + AtomicInt<uint64_t> total_count_; + client::sp::shared_ptr<KuduClient> client_; + std::string table_name_; + gscoped_ptr<ThreadPool> thread_pool_; +}; +} // namespace tools +} // namespace kudu diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc index 6a369f8..59d5f00 100644 --- a/src/kudu/tools/tool_action_cluster.cc +++ b/src/kudu/tools/tool_action_cluster.cc @@ -64,9 +64,7 @@ using strings::Substitute; } while (0); DECLARE_string(tables); -DEFINE_string(tablets, "", - "Tablets to check (comma-separated list of IDs) " - "If not specified, checks all tablets."); +DECLARE_string(tablets); DEFINE_string(sections, "*", "Sections to print (comma-separated list of sections, " diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc index 472cb30..24ab535 100644 --- a/src/kudu/tools/tool_action_common.cc +++ b/src/kudu/tools/tool_action_common.cc @@ -96,6 +96,9 @@ DEFINE_string(print_entries, "decoded", " id = print only their ids"); DEFINE_string(table_name, "", "Restrict output to a specific table by name"); +DEFINE_string(tablets, "", + "Tablets to check (comma-separated list of IDs) " + "If not specified, checks all tablets."); DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds"); DEFINE_int32(truncate_data, 100, "Truncate the data fields to the given number of bytes " @@ -116,6 +119,9 @@ DEFINE_string(tables, "", "Tables to include (comma-separated list of table name DEFINE_string(memtracker_output, "table", "One of 'json', 'json_compact' or 'table'. Table output flattens " "the memtracker hierarchy."); +DEFINE_int32(num_threads, 2, + "Number of threads to run. Each thread runs its own " + "KuduSession."); namespace boost { template <typename Signature> diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc index e86e5c0..fdd3d94 100644 --- a/src/kudu/tools/tool_action_perf.cc +++ b/src/kudu/tools/tool_action_perf.cc @@ -264,9 +264,7 @@ DEFINE_uint64(num_rows_per_thread, 1000, "Number of rows each thread generates and inserts; " "0 means unlimited. All rows generated by a thread are inserted " "in the context of the same session."); -DEFINE_int32(num_threads, 2, - "Number of generator threads to run. Each thread runs its own " - "KuduSession."); +DECLARE_int32(num_threads); DEFINE_bool(run_scan, false, "Whether to run post-insertion scan to verify that the count of " "the inserted rows matches the expected number. If enabled, " diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index 0519c88..a0c3f06 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -41,46 +41,47 @@ #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/tools/table_scanner.h" #include "kudu/tools/tool_action.h" #include "kudu/tools/tool_action_common.h" #include "kudu/util/jsonreader.h" #include "kudu/util/status.h" -DECLARE_string(tables); +using kudu::client::KuduClient; +using kudu::client::KuduClientBuilder; +using kudu::client::KuduColumnSchema; +using kudu::client::KuduPredicate; +using kudu::client::KuduScanToken; +using kudu::client::KuduScanTokenBuilder; +using kudu::client::KuduScanner; +using kudu::client::KuduSchema; +using kudu::client::KuduTable; +using kudu::client::KuduTableAlterer; +using kudu::client::internal::ReplicaController; +using std::cerr; +using std::cout; +using std::endl; +using std::string; +using std::unique_ptr; +using std::vector; +using strings::Split; +using strings::Substitute; + DEFINE_bool(check_row_existence, false, "Also check for the existence of the row on the leader replica of " "the tablet. If found, the full row will be printed; if not found, " "an error message will be printed and the command will return a " "non-zero status."); +DEFINE_bool(list_tablets, false, + "Include tablet and replica UUIDs in the output"); DEFINE_bool(modify_external_catalogs, true, "Whether to modify external catalogs, such as the Hive Metastore, " "when renaming or dropping a table."); -DEFINE_bool(list_tablets, false, - "Include tablet and replica UUIDs in the output"); +DECLARE_string(tables); namespace kudu { namespace tools { -using client::KuduClient; -using client::KuduClientBuilder; -using client::KuduColumnSchema; -using client::KuduPredicate; -using client::KuduScanner; -using client::KuduScanToken; -using client::KuduScanTokenBuilder; -using client::KuduSchema; -using client::KuduTable; -using client::KuduTableAlterer; -using client::internal::ReplicaController; -using std::cerr; -using std::cout; -using std::endl; -using std::string; -using std::unique_ptr; -using std::vector; -using strings::Split; -using strings::Substitute; - // This class only exists so that ListTables() can easily be friended by // KuduReplica, KuduReplica::Data, and KuduClientBuilder. class TableLister { @@ -391,6 +392,16 @@ Status ListTables(const RunnerContext& context) { return TableLister::ListTablets(Split(master_addresses_str, ",")); } +Status ScanTable(const RunnerContext &context) { + client::sp::shared_ptr<KuduClient> client; + RETURN_NOT_OK(CreateKuduClient(context, &client)); + + const string& table_name = FindOrDie(context.required_args, kTableNameArg); + + TableScanner scanner(client, table_name); + return scanner.Run(); +} + } // anonymous namespace unique_ptr<Mode> BuildTableMode() { @@ -452,6 +463,21 @@ unique_ptr<Mode> BuildTableMode() { .AddOptionalParameter("modify_external_catalogs") .Build(); + unique_ptr<Action> scan_table = + ActionBuilder("scan", &ScanTable) + .Description("Scan rows from a table") + .ExtraDescription("Scan rows from an existing table. See the help " + "for the --predicates flag on how predicates can be specified.") + .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc }) + .AddRequiredParameter({ kTableNameArg, "Name of the table to scan"}) + .AddOptionalParameter("columns") + .AddOptionalParameter("fill_cache") + .AddOptionalParameter("num_threads") + .AddOptionalParameter("predicates") + .AddOptionalParameter("show_value") + .AddOptionalParameter("tablets") + .Build(); + return ModeBuilder("table") .Description("Operate on Kudu tables") .AddAction(std::move(delete_table)) @@ -460,6 +486,7 @@ unique_ptr<Mode> BuildTableMode() { .AddAction(std::move(locate_row)) .AddAction(std::move(rename_column)) .AddAction(std::move(rename_table)) + .AddAction(std::move(scan_table)) .Build(); }