This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 314762d add a tool to create table
314762d is described below
commit 314762d32d0e21167f3148a603d6d912c6c75353
Author: YangSong01 <[email protected]>
AuthorDate: Thu Sep 26 10:56:29 2019 +0800
add a tool to create table
Change-Id: I0bce4733a504f8ef5f024089a16fe3c6f1e493f1
Reviewed-on: http://gerrit.cloudera.org:8080/14306
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
---
src/kudu/client/schema.cc | 85 ++
src/kudu/client/schema.h | 25 +
src/kudu/tools/CMakeLists.txt | 4 +
src/kudu/tools/create-table-tool-test.cc | 1277 ++++++++++++++++++++++++++++++
src/kudu/tools/kudu-tool-test.cc | 4 +-
src/kudu/tools/tool.proto | 120 +++
src/kudu/tools/tool_action_table.cc | 396 +++++++--
7 files changed, 1848 insertions(+), 63 deletions(-)
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index 41ab90c..576f032 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -43,6 +43,7 @@
#include "kudu/util/compression/compression.pb.h"
#include "kudu/util/decimal_util.h"
#include "kudu/util/slice.h"
+#include "kudu/util/string_case.h"
DEFINE_bool(show_attributes, false,
"Whether to show column attributes, including column encoding
type, "
@@ -219,6 +220,56 @@ uint16_t KuduColumnTypeAttributes::length() const {
return data_->length;
}
+Status KuduColumnStorageAttributes::StringToEncodingType(
+ const string& encoding,
+ KuduColumnStorageAttributes::EncodingType* type) {
+ Status s;
+ string encoding_uc;
+ ToUpperCase(encoding, &encoding_uc);
+ if (encoding_uc == "AUTO_ENCODING") {
+ *type = KuduColumnStorageAttributes::AUTO_ENCODING;
+ } else if (encoding_uc == "PLAIN_ENCODING") {
+ *type = KuduColumnStorageAttributes::PLAIN_ENCODING;
+ } else if (encoding_uc == "PREFIX_ENCODING") {
+ *type = KuduColumnStorageAttributes::PREFIX_ENCODING;
+ } else if (encoding_uc == "RLE") {
+ *type = KuduColumnStorageAttributes::RLE;
+ } else if (encoding_uc == "DICT_ENCODING") {
+ *type = KuduColumnStorageAttributes::DICT_ENCODING;
+ } else if (encoding_uc == "BIT_SHUFFLE") {
+ *type = KuduColumnStorageAttributes::BIT_SHUFFLE;
+ } else if (encoding_uc == "GROUP_VARINT") {
+ *type = KuduColumnStorageAttributes::GROUP_VARINT;
+ } else {
+ s = Status::InvalidArgument(Substitute(
+ "encoding type $0 is not supported", encoding));
+ }
+ return s;
+}
+
+Status KuduColumnStorageAttributes::StringToCompressionType(
+ const string& compression,
+ KuduColumnStorageAttributes::CompressionType* type) {
+ Status s;
+ string compression_uc;
+ ToUpperCase(compression, &compression_uc);
+ if (compression_uc == "DEFAULT_COMPRESSION") {
+ *type = KuduColumnStorageAttributes::DEFAULT_COMPRESSION;
+ } else if (compression_uc == "NO_COMPRESSION") {
+ *type = KuduColumnStorageAttributes::NO_COMPRESSION;
+ } else if (compression_uc == "SNAPPY") {
+ *type = KuduColumnStorageAttributes::SNAPPY;
+ } else if (compression_uc == "LZ4") {
+ *type = KuduColumnStorageAttributes::LZ4;
+ } else if (compression_uc == "ZLIB") {
+ *type = KuduColumnStorageAttributes::ZLIB;
+ } else {
+ s = Status::InvalidArgument(Substitute(
+ "compression type $0 is not supported", compression));
+ }
+ return s;
+}
+
////////////////////////////////////////////////////////////
// KuduColumnSpec
////////////////////////////////////////////////////////////
@@ -633,6 +684,40 @@ string KuduColumnSchema::DataTypeToString(DataType type) {
LOG(FATAL) << "Unhandled type " << type;
}
+ Status KuduColumnSchema::StringToDataType(
+ const string& type_str, KuduColumnSchema::DataType* type) {
+ Status s;
+ string type_uc;
+ ToUpperCase(type_str, &type_uc);
+ if (type_uc == "INT8") {
+ *type = INT8;
+ } else if (type_uc == "INT16") {
+ *type = INT16;
+ } else if (type_uc == "INT32") {
+ *type = INT32;
+ } else if (type_uc == "INT64") {
+ *type = INT64;
+ } else if (type_uc == "STRING") {
+ *type = STRING;
+ } else if (type_uc == "BOOL") {
+ *type = BOOL;
+ } else if (type_uc == "FLOAT") {
+ *type = FLOAT;
+ } else if (type_uc == "DOUBLE") {
+ *type = DOUBLE;
+ } else if (type_uc == "BINARY") {
+ *type = BINARY;
+ } else if (type_uc == "UNIXTIME_MICROS") {
+ *type = UNIXTIME_MICROS;
+ } else if (type_uc == "DECIMAL") {
+ *type = DECIMAL;
+ } else {
+ s = Status::InvalidArgument(Substitute(
+ "data type $0 is not supported", type_str));
+ }
+ return s;
+}
+
KuduColumnSchema::KuduColumnSchema(const string &name,
DataType type,
bool is_nullable,
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index ff6b49b..85e39b6 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -184,6 +184,24 @@ class KUDU_EXPORT KuduColumnStorageAttributes {
/// @return String representation of the storage attributes.
std::string ToString() const;
+ /// @param [in] encoding
+ /// String representation of the column encoding type
+ /// @param [out] type
+ /// Enum representation of the column encoding type,
+ /// Converted from string format.
+ /// @return Operation result status.
+ static Status StringToEncodingType(const std::string& encoding,
+ EncodingType* type);
+
+ /// @param [in] compression
+ /// String representation of the column compression type
+ /// @param [out] type
+ /// Enum representation of the column compression type,
+ /// Converted from string format.
+ /// @return Operation result status.
+ static Status StringToCompressionType(const std::string& compression,
+ CompressionType* type);
+
private:
EncodingType encoding_;
CompressionType compression_;
@@ -215,6 +233,13 @@ class KUDU_EXPORT KuduColumnSchema {
/// @return String representation of the column data type.
static std::string DataTypeToString(DataType type);
+ /// @param [in] type_str
+ /// String representation of the column data type
+ /// @param [out] type
+ /// Enum representation of the column data type, Converted from string
format.
+ /// @return Operation result status.
+ static Status StringToDataType(const std::string& type_str, DataType* type);
+
/// Construct KuduColumnSchema object as a copy of another object.
///
/// @param [in] other
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 7958789..face315 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -169,6 +169,10 @@ SET_KUDU_TEST_LINK_LIBS(
kudu_tools_test_util
kudu_tools_util
mini_cluster)
+ADD_KUDU_TEST(create-table-tool-test
+ NUM_SHARDS 4 PROCESSORS 3)
+ADD_KUDU_TEST_DEPENDENCIES(create-table-tool-test
+ kudu)
ADD_KUDU_TEST(diagnostics_log_parser-test)
ADD_KUDU_TEST(ksck-test)
ADD_KUDU_TEST(ksck_remote-test
diff --git a/src/kudu/tools/create-table-tool-test.cc
b/src/kudu/tools/create-table-tool-test.cc
new file mode 100644
index 0000000..0ec1614
--- /dev/null
+++ b/src/kudu/tools/create-table-tool-test.cc
@@ -0,0 +1,1277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdio>
+#include <map>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/common/partition.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/mini_cluster_fs_inspector.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
+
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::itest::MiniClusterFsInspector;
+using kudu::itest::TServerDetails;
+using std::map;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace tools {
+
+class CreateTableToolTest : public KuduTest {
+ public:
+ ~CreateTableToolTest() {
+ STLDeleteValues(&ts_map_);
+ }
+
+ virtual void TearDown() OVERRIDE {
+ if (cluster_) cluster_->Shutdown();
+ KuduTest::TearDown();
+ }
+
+ protected:
+ void StartExternalMiniCluster(ExternalMiniClusterOptions opts = {});
+ unique_ptr<ExternalMiniCluster> cluster_;
+ unordered_map<string, TServerDetails*> ts_map_;
+ unique_ptr<MiniClusterFsInspector> inspect_;
+};
+
+void CreateTableToolTest::StartExternalMiniCluster(ExternalMiniClusterOptions
opts) {
+ cluster_.reset(new ExternalMiniCluster(std::move(opts)));
+ ASSERT_OK(cluster_->Start());
+ inspect_.reset(new MiniClusterFsInspector(cluster_.get()));
+ ASSERT_OK(CreateTabletServerMap(cluster_->master_proxy(0),
+ cluster_->messenger(), &ts_map_));
+}
+
+TEST_F(CreateTableToolTest, TestCreateTable) {
+ constexpr auto kReplicationFactor = 4;
+ ExternalMiniClusterOptions opts;
+ opts.num_tablet_servers = kReplicationFactor;
+ NO_FATALS(StartExternalMiniCluster(opts));
+ string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(KuduClientBuilder().add_master_server_addr(master_addr)
+ .Build(&client));
+
+ // Test a few good cases.
+ const auto check_good_input = [&](const string& json_str,
+ const string& master,
+ const string& table_name,
+ const string& schema,
+ const string& partition,
+ const map<string, string>& extra_configs,
+ KuduClient* client) {
+ const vector<string> table_args = {
+ "table", "create", master, json_str
+ };
+ bool table_exists = false;
+ ASSERT_OK(RunKuduTool(table_args));
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_OK(client->TableExists(table_name, &table_exists));
+ ASSERT_TRUE(table_exists);
+ });
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(table_name, &table));
+ ASSERT_EQ(table->name(), table_name);
+ ASSERT_EQ(table->schema().ToString(), schema);
+ ASSERT_EQ(table->partition_schema().DebugString(KuduSchema::ToSchema(
+ table->schema())), partition);
+ ASSERT_EQ(table->extra_configs(), extra_configs);
+ };
+
+ // Create a simple table.
+ string simple_table = R"(
+ {
+ "table_name": "simple_table",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "encoding": 1,
+ "compression": 3
+ },
+ {
+ "column_name": "key",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "comment": "range key"
+ },
+ {
+ "column_name": "name",
+ "column_type": "BINARY",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id", "key"
+ ]
+ },
+ "extra_configs" : {
+ "configs" : {
+ "kudu.table.history_max_age_sec": "3600"
+ }
+ },
+ "num_replicas": 3,
+ "dimension_label": "test"
+ }
+ )";
+ string schema = "(\n id INT32 NOT NULL,\n key STRING NOT NULL,\n "
+ "name BINARY NOT NULL,\n PRIMARY KEY (id, key)\n)";
+ string partition = "";
+ map<string, string> extra_configs;
+ extra_configs["kudu.table.history_max_age_sec"] = "3600";
+ NO_FATALS(check_good_input(simple_table, master_addr, "simple_table",
+ schema, partition, extra_configs, client.get()));
+
+ // Create a hash table.
+ string hash_table = R"(
+ {
+ "table_name": "hash_table",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "compression": 3
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2,
+ "seed": 100
+ }
+ ]
+ }
+ }
+ )";
+ schema = "(\n id INT32 NOT NULL,\n "
+ "name STRING NOT NULL,\n PRIMARY KEY (id)\n)";
+ partition = "HASH (id) PARTITIONS 2 SEED 100";
+ NO_FATALS(check_good_input(hash_table, master_addr, "hash_table",
+ schema, partition, {}, client.get()));
+
+ // Create a range table.
+ string range_table = R"(
+ {
+ "table_name": "range_table",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ },
+ {
+ "column_name": "score",
+ "column_type": "DOUBLE",
+ "is_nullable": false,
+ "default_value": "0.0",
+ "comment": "user score"
+ }
+ ],
+ "key_column_names": [
+ "id", "name"
+ ]
+ },
+ "partition": {
+ "range_partition": {
+ "columns": ["id", "name"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["2", "zhangsan"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": 2,
+ "bound_values": ["2", "zhangsan"]
+ },
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["3", "lisi"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["3", "lisi"]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )";
+ schema = "(\n id INT32 NOT NULL,\n name STRING NOT NULL,\n"
+ " score DOUBLE NOT NULL,\n PRIMARY KEY (id, name)\n)";
+ partition = "RANGE (id, name)";
+ NO_FATALS(check_good_input(range_table, master_addr, "range_table",
+ schema, partition, {}, client.get()));
+
+ // Create a hash+hash table.
+ string hash_hash_table = R"(
+ {
+ "table_name": "hash_hash_table",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "key",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id", "key"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2
+ },
+ {
+ "columns": ["key"],
+ "num_buckets": 2
+ }
+ ]
+ },
+ "extra_configs" : {
+ "configs": {
+ "kudu.table.history_max_age_sec": "3600"
+ }
+ }
+ }
+ )";
+ schema = "(\n id INT32 NOT NULL,\n key STRING NOT NULL,\n"
+ " name STRING NOT NULL,\n PRIMARY KEY (id, key)\n)";
+ partition = "HASH (id) PARTITIONS 2, HASH (key) PARTITIONS 2";
+ NO_FATALS(check_good_input(hash_hash_table, master_addr, "hash_hash_table",
+ schema, partition, extra_configs, client.get()));
+
+ // Create a hash+range table.
+ string hash_range_table = R"(
+ {
+ "table_name": "hash_range_table",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "key",
+ "column_type": "INT64",
+ "is_nullable": false,
+ "comment": "range key"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id", "key"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2,
+ "seed": 100
+ }
+ ],
+ "range_partition": {
+ "columns": ["key"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": "EXCLUSIVE",
+ "bound_values": ["2"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["2"]
+ },
+ "upper_bound": {
+ "bound_type": "EXCLUSIVE",
+ "bound_values": ["3"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["3"]
+ }
+ }
+ ]
+ }
+ },
+ "extra_configs" : {
+ "configs" : {
+ "kudu.table.history_max_age_sec": "3600"
+ }
+ },
+ "num_replicas": 3,
+ "dimension_label": "test"
+ }
+ )";
+ schema = "(\n id INT32 NOT NULL,\n key INT64 NOT NULL,\n"
+ " name STRING NOT NULL,\n PRIMARY KEY (id, key)\n)";
+ partition = "HASH (id) PARTITIONS 2 SEED 100, RANGE (key)";
+ NO_FATALS(check_good_input(hash_range_table, master_addr, "hash_range_table",
+ schema, partition, extra_configs, client.get()));
+
+ // Create a table with decimal column type.
+ string decimal_table = R"(
+ {
+ "table_name": "decimal_table",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT64",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "score",
+ "column_type": "DECIMAL",
+ "type_attributes": {
+ "precision": 10,
+ "scale": 10
+ },
+ "is_nullable": false,
+ "comment": "range key"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "extra_configs" : {
+ "configs" : {
+ "kudu.table.history_max_age_sec": "3600"
+ }
+ },
+ "num_replicas": 3,
+ "dimension_label": "test"
+ }
+ )";
+ schema = "(\n id INT64 NOT NULL,\n score DECIMAL(10, 10) NOT NULL,\n"
+ " name STRING NOT NULL,\n PRIMARY KEY (id)\n)";
+ partition = "";
+ extra_configs["kudu.table.history_max_age_sec"] = "3600";
+ NO_FATALS(check_good_input(decimal_table, master_addr, "decimal_table",
+ schema, partition, extra_configs, client.get()));
+
+ // Create a table using string value instead of int for enum type,
+ string enum_type_with_str = R"(
+ {
+ "table_name": "enum_type_with_str",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "encoding": "plain_encoding"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name",
+ "compression": "zlib"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "range_partition": {
+ "columns": ["id"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": "EXCLUSIVE",
+ "bound_values": ["2"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["2"]
+ },
+ "upper_bound": {
+ "bound_type": "EXCLUSIVE",
+ "bound_values": ["3"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["3"]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )";
+ schema = "(\n id INT32 NOT NULL,\n name STRING NOT NULL,\n"
+ " PRIMARY KEY (id)\n)";
+ partition = "RANGE (id)";
+ NO_FATALS(check_good_input(enum_type_with_str, master_addr,
+ "enum_type_with_str", schema, partition, {}, client.get()));
+
+ // Create a table with invalid compression type, but it will be converted to
default.
+ string compression_type_unknown = R"(
+ {
+ "table_name": "compression_type_unknown",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "compression": 300,
+ "comment": "id"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ }
+ }
+ )";
+ schema = "(\n id INT32 NOT NULL,\n name STRING NOT NULL,\n"
+ " PRIMARY KEY (id)\n)";
+ partition = "";
+ NO_FATALS(check_good_input(compression_type_unknown, master_addr,
+ "compression_type_unknown", schema, partition, {}, client.get()));
+
+ // Create a table with invalid encoding type, but it will be converted to
default.
+ string encoding_type_unknown = R"(
+ {
+ "table_name": "encoding_type_unknown",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "encoding": 200
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ }
+ }
+ )";
+ schema = "(\n id INT32 NOT NULL,\n name STRING NOT NULL,\n"
+ " PRIMARY KEY (id)\n)";
+ partition = "";
+ NO_FATALS(check_good_input(encoding_type_unknown, master_addr,
+ "encoding_type_unknown", schema, partition, {}, client.get()));
+
+ // Test a few error cases.
+ const auto check_bad_input = [&](const string& json_str,
+ const string& master,
+ const string& err) {
+ string stderr;
+ string stdout;
+ const vector<string> simple_table_args = {
+ "table", "create", master, json_str
+ };
+ Status s = RunKuduTool(simple_table_args, &stdout, &stderr);
+ ASSERT_TRUE(s.IsRuntimeError());
+ ASSERT_STR_CONTAINS(stderr, err);
+ };
+
+ // JSON string is empty
+ string empty_string = "";
+ string err = "Unexpected end of string. Expected a value";
+ NO_FATALS(check_bad_input(empty_string, master_addr, err));
+
+ // JSON object is empty
+ string empty_json = "{}";
+ err = "Invalid argument: Missing table name";
+ NO_FATALS(check_bad_input(empty_json, master_addr, err));
+
+ // JSON object is invalid
+ string invailed_json = "{\"table\": \"decimal_table\"}";
+ err = "Cannot find field";
+ NO_FATALS(check_bad_input(invailed_json, master_addr, err));
+
+ // Create a table without primary key.
+ string table_without_pk = R"(
+ {
+ "table_name": "table_without_pk",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2
+ }
+ ]
+ }
+ }
+ )";
+ err = "must specify at least one key column";
+ NO_FATALS(check_bad_input(table_without_pk, master_addr, err));
+
+ // Create a table without table name.
+ string table_without_name = R"(
+ {
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2
+ }
+ ]
+ }
+ }
+ )";
+ err = "Missing table name";
+ NO_FATALS(check_bad_input(table_without_name, master_addr, err));
+
+ // Create a table with primary key error.
+ string primay_key_error = R"(
+ {
+ "table_name": "primay_key_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "ID"
+ ]
+ }
+ }
+ )";
+ err = "primary key column not defined";
+ NO_FATALS(check_bad_input(primay_key_error, master_addr, err));
+
+ // Create a table with hash bucket error.
+ string hash_bucket_error = R"(
+ {
+ "table_name": "hash_bucket_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 1
+ }
+ ]
+ }
+ }
+ )";
+ err = "must have at least two hash buckets";
+ NO_FATALS(check_bad_input(hash_bucket_error, master_addr, err));
+
+ // Create a table with hash key error.
+ string hash_key_error = R"(
+ {
+ "table_name": "hash_key_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["ID"],
+ "num_buckets": 2
+ }
+ ]
+ }
+ }
+ )";
+ err = "unknown column: name: \"ID\"";
+ NO_FATALS(check_bad_input(hash_key_error, master_addr, err));
+
+ // Create a table with range key error.
+ string range_key_error = R"(
+ {
+ "table_name": "range_key_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "range_partition": {
+ "columns": ["key"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["2"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": 2,
+ "bound_values": ["2"]
+ },
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["3"]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )";
+ err = "Invalid range value size, value size should be equal to number of
range keys";
+ NO_FATALS(check_bad_input(range_key_error, master_addr, err));
+
+ // Create a table with range bound error.
+ string range_bound_error = R"(
+ {
+ "table_name": "range_bound_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "range_partition": {
+ "columns": ["id"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["3"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": 2,
+ "bound_values": ["2"]
+ },
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["4"]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )";
+ err = "overlapping range partitions:";
+ NO_FATALS(check_bad_input(range_bound_error, master_addr, err));
+
+ // Create a table with range bound value error.
+ string range_bound_value_error = R"(
+ {
+ "table_name": "range_bound_value_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "range_partition": {
+ "columns": ["id"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["abc"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": 2,
+ "bound_values": ["2"]
+ },
+ "upper_bound": {
+ "bound_type": 1,
+ "bound_values": ["4"]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )";
+ err = "JSON text is corrupt: Invalid value";
+ NO_FATALS(check_bad_input(range_bound_value_error, master_addr, err));
+
+ // Create a table with column type error.
+ string column_type_error = R"(
+ {
+ "table_name": "column_type_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT31",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2
+ }
+ ]
+ }
+ }
+ )";
+ err = "data type INT31 is not supported";
+ NO_FATALS(check_bad_input(column_type_error, master_addr, err));
+
+ // Create a table with column default value error.
+ string column_value_error = R"(
+ {
+ "table_name": "column_value_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "abc"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2
+ }
+ ]
+ }
+ }
+ )";
+ err = "JSON text is corrupt: Invalid value";
+ NO_FATALS(check_bad_input(column_value_error, master_addr, err));
+
+ // Create a table with pk not listed first.
+ string pk_not_first = R"(
+ {
+ "table_name": "pk_not_first",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "name"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["name"],
+ "num_buckets": 2
+ }
+ ]
+ }
+ }
+ )";
+ err = "primary key columns must be listed first in the schema";
+ NO_FATALS(check_bad_input(pk_not_first, master_addr, err));
+
+ // Create a table with column type and encoding conflict.
+ string encoding_type_conflict = R"(
+ {
+ "table_name": "encoding_type_conflict",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "encoding": 4
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name",
+ "encoding": 4
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "hash_partitions": [
+ {
+ "columns": ["id"],
+ "num_buckets": 2
+ }
+ ]
+ }
+ }
+ )";
+ err = "encoding DICT_ENCODING not supported for type INT32";
+ NO_FATALS(check_bad_input(encoding_type_conflict, master_addr, err));
+
+ // Create a table with encoding type errors,
+ string encoding_type_error = R"(
+ {
+ "table_name": "encoding_type_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "encoding": "error_encoding"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name",
+ "compression": "zlib"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "range_partition": {
+ "columns": ["id"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["2"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["2"]
+ },
+ "upper_bound": {
+ "bound_type": "EXCLUSIVE",
+ "bound_values": ["3"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["3"]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )";
+ err = "unable to parse JSON";
+ NO_FATALS(check_bad_input(encoding_type_error, master_addr, err));
+
+ // Create a table with range partition bound type error,
+ string bound_type_error = R"(
+ {
+ "table_name": "bound_type_error",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1",
+ "encoding": 1
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ },
+ "partition": {
+ "range_partition": {
+ "columns": ["id"],
+ "range_bounds": [
+ {
+ "upper_bound": {
+ "bound_type": 3,
+ "bound_values": ["2"]
+ }
+ },
+ {
+ "lower_bound": {
+ "bound_type": "INCLUSIVE",
+ "bound_values": ["3"]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )";
+ err = "Unexpected range partition bound type";
+ NO_FATALS(check_bad_input(bound_type_error, master_addr, err));
+
+ // Create a table with pk is nullable.
+ string pk_is_nullable = R"(
+ {
+ "table_name": "pk_is_nullable",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": true,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ }
+ }
+ )";
+ err = "Nullable key columns are not supported";
+ NO_FATALS(check_bad_input(pk_is_nullable, master_addr, err));
+
+ // Create a table that already exists.
+ string existed_table = R"(
+ {
+ "table_name": "simple_table",
+ "schema": {
+ "columns": [
+ {
+ "column_name": "id",
+ "column_type": "INT32",
+ "is_nullable": false,
+ "default_value": "1"
+ },
+ {
+ "column_name": "name",
+ "column_type": "STRING",
+ "is_nullable": false,
+ "default_value": "zhangsan",
+ "comment": "user name"
+ }
+ ],
+ "key_column_names": [
+ "id"
+ ]
+ }
+ }
+ )";
+ err = "table simple_table already exists with id";
+ NO_FATALS(check_bad_input(existed_table, master_addr, err));
+}
+
+} // namespace tools
+} // namespace kudu
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 55a47e5..126fba0 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3415,7 +3415,7 @@ TEST_F(ToolTest, TestAlterColumn) {
// Test invalid compression type.
NO_FATALS(check_bad_input("column_set_compression",
"UNKNOWN_COMPRESSION_TYPE",
- "Failed to parse compression type"));
+ "compression type UNKNOWN_COMPRESSION_TYPE is not
supported"));
// Alter encoding type for a column.
NO_FATALS(RunActionStdoutNone(Substitute("table column_set_encoding $0 $1 $2
$3",
@@ -3428,7 +3428,7 @@ TEST_F(ToolTest, TestAlterColumn) {
// Test invalid encoding type.
NO_FATALS(check_bad_input("column_set_encoding",
"UNKNOWN_ENCODING_TYPE",
- "Failed to parse encoding type"));
+ "encoding type UNKNOWN_ENCODING_TYPE is not
supported"));
// Alter block_size for a column.
NO_FATALS(RunActionStdoutNone(Substitute("table column_set_block_size $0 $1
$2 $3",
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
index 570ef45..da11ea2 100644
--- a/src/kudu/tools/tool.proto
+++ b/src/kudu/tools/tool.proto
@@ -326,3 +326,123 @@ message KsckCountSummaryPB {
optional int32 tablets = 4;
optional int32 replicas = 5;
}
+
+message ColumnPB {
+ enum EncodingType {
+ AUTO_ENCODING = 0;
+ PLAIN_ENCODING = 1;
+ PREFIX_ENCODING = 2;
+ RLE = 3;
+ DICT_ENCODING = 4;
+ BIT_SHUFFLE = 5;
+ }
+ enum CompressionType {
+ DEFAULT_COMPRESSION = 0;
+ NO_COMPRESSION = 1;
+ SNAPPY = 2;
+ LZ4 = 3;
+ ZLIB = 4;
+ }
+ message ColumnAttributesPB {
+ // For decimal columns
+ optional int32 precision = 1;
+ optional int32 scale = 2;
+ }
+ optional string column_name = 1;
+ optional string column_type = 2;
+ optional bool is_nullable = 3;
+ // The default value takes the form of a literal string.
+ optional string default_value = 4;
+ optional string comment = 5;
+ optional EncodingType encoding = 6;
+ optional CompressionType compression = 7;
+ // Column's attribute, used for DECIMAL type column.
+ optional ColumnAttributesPB type_attributes = 8;
+ // The preferred block size for cfile blocks.
+ optional int32 cfile_block_size = 9;
+}
+
+message PartitionPB {
+ message RangePartitionPB {
+ message BoundPB {
+ enum Type {
+ UNKNOWN_BOUND = 0;
+ EXCLUSIVE = 1;
+ INCLUSIVE = 2;
+ }
+ // The type of range partition bound, "INCLUSIVE" or "EXCLUSIVE"
+ optional Type bound_type = 1;
+ // The start or end partition key values. Bounds value should be simply
the
+ // exact string value for the bound.
+ repeated string bound_values = 2;
+ }
+ message RangeBoundPB {
+ // The lower bound.
+ optional BoundPB lower_bound = 1;
+ // The upper bound.
+ optional BoundPB upper_bound = 2;
+ }
+ message SplitValuePB {
+ // The split values of range partition keys. Split value should be
simply the
+ // exact string value for the bound.
+ repeated string split_values = 1;
+ }
+
+ // Column names of columns included in the range. All columns must be
+ // a component of the primary key.
+ repeated string columns = 1;
+ // Range bound.
+ repeated RangeBoundPB range_bounds = 2;
+ // Range split.
+ repeated SplitValuePB range_splits = 3;
+ }
+
+ message HashPartitionPB {
+ // Column names of columns included in the hash. Every column must be
+ // a component of the primary key.
+ repeated string columns = 1;
+ // Number of buckets into which columns will be hashed. Must be at least 2.
+ optional int32 num_buckets = 2;
+ // Seed value for hash calculation. Administrators may set a seed value
+ // on a per-table basis in order to randomize the mapping of rows to
+ // buckets. Setting a seed provides some amount of protection against
denial
+ // of service attacks when the hash bucket columns contain user provided
+ // input.
+ optional uint32 seed = 3;
+ }
+
+ // Hash partition message. Support zero or more hash partition levels .
+ repeated HashPartitionPB hash_partitions = 1;
+ // range partition message.
+ optional RangePartitionPB range_partition = 2;
+}
+
+message ExtraConfigPB {
+ map<string, string> configs = 1;
+}
+
+message SchemaPB {
+ // Representation of table's columns, include more than one column message.
+ repeated ColumnPB columns = 1;
+ // The table's primary keys.
+ repeated string key_column_names = 2;
+}
+
+// Create table protobuffer message. The JSON message provided by user
+// is converted to the PB. Used for creating a new table by kudu tool.
+message CreateTablePB {
+ optional string table_name = 1;
+ // Representation of a table's schema, include columns's message and
+ // primary keys.
+ optional SchemaPB schema = 2;
+ // The table partition message, include hash partition and range partition.
+ optional PartitionPB partition = 3;
+ //Number of tablet replica
+ optional int32 num_replicas = 4;
+ // The table's extra configuration properties.
+ optional ExtraConfigPB extra_configs = 5;
+ // The dimension label for tablets that were created during table creation.
Used for
+ // dimension-specific placement of tablet replicas corresponding to the
partitions of
+ // the newly created table.
+ optional string dimension_label = 6;
+}
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 72f55e2..b68e079 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -31,6 +31,9 @@
#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
+#include <google/protobuf/stubs/status.h>
+#include <google/protobuf/stubs/stringpiece.h>
+#include <google/protobuf/util/json_util.h>
#include <rapidjson/document.h>
#include "kudu/client/client.h"
@@ -50,21 +53,24 @@
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tools/table_scanner.h"
+#include "kudu/tools/tool.pb.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/util/jsonreader.h"
#include "kudu/util/status.h"
-#include "kudu/util/string_case.h"
+using google::protobuf::RepeatedPtrField;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
+using kudu::client::KuduColumnSpec;
using kudu::client::KuduColumnStorageAttributes;
using kudu::client::KuduPredicate;
using kudu::client::KuduScanToken;
using kudu::client::KuduScanTokenBuilder;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
@@ -74,6 +80,8 @@ using kudu::client::internal::ReplicaController;
using std::cerr;
using std::cout;
using std::endl;
+using std::map;
+using std::pair;
using std::set;
using std::string;
using std::unique_ptr;
@@ -170,6 +178,7 @@ const char* const kDefaultValueArg = "default_value";
const char* const kCompressionTypeArg = "compression_type";
const char* const kEncodingTypeArg = "encoding_type";
const char* const kBlockSizeArg = "block_size";
+const char* const kCreateTableJSONArg = "create_table_json";
enum PartitionAction {
ADD,
@@ -492,9 +501,10 @@ Status GetExtraConfigs(const RunnerContext& context) {
return data_table.PrintTo(cout);
}
-Status ConvertToKuduPartialRow(const client::sp::shared_ptr<KuduTable>& table,
- const string& range_bound_str,
- KuduPartialRow* range_bound_partial_row) {
+Status ConvertToKuduPartialRow(
+ const vector<pair<string, KuduColumnSchema::DataType>>& range_columns,
+ const string& range_bound_str,
+ KuduPartialRow* range_bound_partial_row) {
JsonReader reader(range_bound_str);
RETURN_NOT_OK(reader.Init());
vector<const rapidjson::Value *> values;
@@ -506,22 +516,16 @@ Status ConvertToKuduPartialRow(const
client::sp::shared_ptr<KuduTable>& table,
if (values.empty()) {
return Status::OK();
}
- const Schema& schema = KuduSchema::ToSchema(table->schema());
- const auto& partition_schema = table->partition_schema();
- vector<int32_t> key_indexes;
- RETURN_NOT_OK(partition_schema.GetRangeSchemaColumnIndexes(schema,
&key_indexes));
- if (values.size() != key_indexes.size()) {
+ if (values.size() != range_columns.size()) {
return Status::InvalidArgument(
Substitute("wrong number of range columns specified: expected $0 but
received $1",
- key_indexes.size(),
+ range_columns.size(),
values.size()));
}
for (int i = 0; i < values.size(); i++) {
- const auto key_index = key_indexes[i];
- const auto& column = table->schema().Column(key_index);
- const auto& col_name = column.name();
- const auto type = column.type();
+ const auto& col_name = range_columns[i].first;
+ const auto type = range_columns[i].second;
const auto error_msg = Substitute(kErrorMsgArg, values[i], col_name,
KuduColumnSchema::DataTypeToString(type));
switch (type) {
@@ -633,8 +637,23 @@ Status ModifyRangePartition(const RunnerContext& context,
PartitionAction action
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
- RETURN_NOT_OK(ConvertToKuduPartialRow(table, table_range_lower_bound,
lower_bound.get()));
- RETURN_NOT_OK(ConvertToKuduPartialRow(table, table_range_upper_bound,
upper_bound.get()));
+ vector<pair<string, KuduColumnSchema::DataType>> range_col_names_and_types;
+ const Schema& schema_tmp = KuduSchema::ToSchema(schema);
+ const auto& partition_schema = table->partition_schema();
+ vector<int32_t> key_indexes;
+ RETURN_NOT_OK(partition_schema.GetRangeSchemaColumnIndexes(schema_tmp,
&key_indexes));
+ for (int i = 0; i < key_indexes.size(); i++) {
+ const auto key_index = key_indexes[i];
+ const auto& column = schema.Column(key_index);
+ range_col_names_and_types.emplace_back(std::make_pair(column.name(),
+ column.type()));
+ }
+ RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
+ table_range_lower_bound,
+ lower_bound.get()));
+ RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
+ table_range_upper_bound,
+ upper_bound.get()));
KuduTableCreator::RangePartitionBound lower_bound_type;
KuduTableCreator::RangePartitionBound upper_bound_type;
@@ -767,33 +786,19 @@ Status ColumnRemoveDefault(const RunnerContext& context) {
return alterer->Alter();
}
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
Status ColumnSetCompression(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& compression_type_arg = FindOrDie(context.required_args,
kCompressionTypeArg);
- std::string compression_type_uc;
- ToUpperCase(compression_type_arg, &compression_type_uc);
-
- static unordered_map<string, KuduColumnStorageAttributes::CompressionType>
compression_type_map =
- {{"DEFAULT_COMPRESSION",
KuduColumnStorageAttributes::CompressionType::DEFAULT_COMPRESSION},
- {"NO_COMPRESSION",
KuduColumnStorageAttributes::CompressionType::NO_COMPRESSION},
- {"SNAPPY", KuduColumnStorageAttributes::CompressionType::SNAPPY},
- {"LZ4", KuduColumnStorageAttributes::CompressionType::LZ4},
- {"ZLIB", KuduColumnStorageAttributes::CompressionType::ZLIB}};
-
- const KuduColumnStorageAttributes::CompressionType* compression_type =
- FindOrNull(compression_type_map, compression_type_uc);
- if (!compression_type) {
- return Status::InvalidArgument(Substitute(
- "Failed to parse compression type from $0, supported compression types
are: $1.",
- compression_type_arg,
- JoinKeysIterator(compression_type_map.begin(),
compression_type_map.end(), ", ")));
- }
-
+ KuduColumnStorageAttributes::CompressionType compression_type;
+ RETURN_NOT_OK(KuduColumnStorageAttributes::StringToCompressionType(
+ compression_type_arg, &compression_type));
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
- alterer->AlterColumn(column_name)->Compression(*compression_type);
+ alterer->AlterColumn(column_name)->Compression(compression_type);
return alterer->Alter();
}
@@ -801,32 +806,16 @@ Status ColumnSetEncoding(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& encoding_type_arg = FindOrDie(context.required_args,
kEncodingTypeArg);
- std::string encoding_type_uc;
- ToUpperCase(encoding_type_arg, &encoding_type_uc);
-
- static unordered_map<string, KuduColumnStorageAttributes::EncodingType>
encoding_type_map =
- {{"AUTO_ENCODING",
KuduColumnStorageAttributes::EncodingType::AUTO_ENCODING},
- {"PLAIN_ENCODING",
KuduColumnStorageAttributes::EncodingType::PLAIN_ENCODING},
- {"PREFIX_ENCODING",
KuduColumnStorageAttributes::EncodingType::PREFIX_ENCODING},
- {"RLE", KuduColumnStorageAttributes::EncodingType::RLE},
- {"DICT_ENCODING",
KuduColumnStorageAttributes::EncodingType::DICT_ENCODING},
- {"BIT_SHUFFLE", KuduColumnStorageAttributes::EncodingType::BIT_SHUFFLE}};
-
- const KuduColumnStorageAttributes::EncodingType* encoding_type =
- FindOrNull(encoding_type_map, encoding_type_uc);
- if (!encoding_type) {
- return Status::InvalidArgument(Substitute(
- "Failed to parse encoding type from $0, supported encoding types are:
$1.",
- encoding_type_arg,
- JoinKeysIterator(encoding_type_map.begin(), encoding_type_map.end(), ",
")));
- }
-
+ KuduColumnStorageAttributes::EncodingType encoding_type;
+ RETURN_NOT_OK(KuduColumnStorageAttributes::StringToEncodingType(
+ encoding_type_arg, &encoding_type));
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
- alterer->AlterColumn(column_name)->Encoding(*encoding_type);
+ alterer->AlterColumn(column_name)->Encoding(encoding_type);
return alterer->Alter();
}
+#pragma GCC diagnostic pop
Status ColumnSetBlockSize(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
@@ -877,6 +866,267 @@ Status GetTableStatistics(const RunnerContext& context) {
return Status::OK();
}
+Status ToJsonPartialRow(const RepeatedPtrField<string>& values,
+ const vector<pair<string, KuduColumnSchema::DataType>>&
range_columns,
+ string* json_value) {
+ json_value->clear();
+ if (values.empty() || values.size() != range_columns.size()) {
+ return Status::InvalidArgument(Substitute(
+ "Invalid range value size, value size should be equal to number of
range keys."));
+ }
+ int i = 0;
+ string joined = JoinMapped(values, [&](const string& v) {
+ auto data_type = range_columns[i++].second;
+ if (data_type == KuduColumnSchema::STRING ||
+ data_type == KuduColumnSchema::BINARY) {
+ return "\"" + v + "\"";
+ }
+ return v;
+ }, ",");
+ *json_value = "[" + joined + "]";
+ return Status::OK();
+}
+
+Status ToClientEncodingType(
+ ColumnPB::EncodingType type_pb,
+ KuduColumnStorageAttributes::EncodingType* type) {
+ Status s;
+ switch (type_pb) {
+ case ColumnPB::AUTO_ENCODING :
+ *type = KuduColumnStorageAttributes::AUTO_ENCODING;
+ break;
+ case ColumnPB::PLAIN_ENCODING :
+ *type = KuduColumnStorageAttributes::PLAIN_ENCODING;
+ break;
+ case ColumnPB::PREFIX_ENCODING :
+ *type = KuduColumnStorageAttributes::PREFIX_ENCODING;
+ break;
+ case ColumnPB::DICT_ENCODING :
+ *type = KuduColumnStorageAttributes::DICT_ENCODING;
+ break;
+ case ColumnPB::RLE :
+ *type = KuduColumnStorageAttributes::RLE;
+ break;
+ case ColumnPB::BIT_SHUFFLE :
+ *type = KuduColumnStorageAttributes::BIT_SHUFFLE;
+ break;
+ default :
+ s = Status::InvalidArgument(Substitute("Unexpected encoding type: $0",
type_pb));
+ }
+ return s;
+}
+
+Status ToClientCompressionType(
+ ColumnPB::CompressionType type_pb,
+ KuduColumnStorageAttributes::CompressionType* type) {
+ Status s;
+ switch (type_pb) {
+ case ColumnPB::DEFAULT_COMPRESSION :
+ *type = KuduColumnStorageAttributes::DEFAULT_COMPRESSION;
+ break;
+ case ColumnPB::NO_COMPRESSION :
+ *type = KuduColumnStorageAttributes::NO_COMPRESSION;
+ break;
+ case ColumnPB::SNAPPY :
+ *type = KuduColumnStorageAttributes::SNAPPY;
+ break;
+ case ColumnPB::LZ4 :
+ *type = KuduColumnStorageAttributes::LZ4;
+ break;
+ case ColumnPB::ZLIB :
+ *type = KuduColumnStorageAttributes::ZLIB;
+ break;
+ default :
+ s = Status::InvalidArgument(Substitute("Unexpected compression type:
$0", type_pb));
+ }
+ return s;
+}
+
+Status ToClientRangePartitionBound(
+ PartitionPB_RangePartitionPB_BoundPB::Type type_pb,
+ KuduTableCreator::RangePartitionBound* type) {
+ Status s;
+ switch (type_pb) {
+ case PartitionPB_RangePartitionPB_BoundPB::EXCLUSIVE:
+ *type = KuduTableCreator::EXCLUSIVE_BOUND;
+ break;
+ case PartitionPB_RangePartitionPB_BoundPB::INCLUSIVE:
+ *type = KuduTableCreator::INCLUSIVE_BOUND;
+ break;
+ case PartitionPB_RangePartitionPB_BoundPB::UNKNOWN_BOUND :
+ default:
+ s = Status::InvalidArgument(Substitute("Unexpected range partition bound
type: ", type_pb));
+ }
+ return s;
+}
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
+Status ParseTableSchema(const SchemaPB& schema,
+ KuduSchema* kudu_schema) {
+ KuduSchemaBuilder b;
+ for (const auto& column : schema.columns()) {
+ KuduColumnSpec* spec = b.AddColumn(column.column_name());
+ KuduColumnSchema::DataType type;
+ RETURN_NOT_OK(KuduColumnSchema::StringToDataType(
+ column.column_type(), &type));
+ spec->Type(type);
+ if (column.has_type_attributes()) {
+ spec->Precision(column.type_attributes().precision());
+ spec->Scale(column.type_attributes().scale());
+ }
+ if (!column.is_nullable()) {
+ spec->NotNull();
+ }
+ if (column.has_default_value()) {
+ KuduValue* value = nullptr;
+ string default_v;
+ if (column.column_type() == "STRING" ||
+ column.column_type() == "BINARY" ||
+ column.column_type() == "DECIMAL") {
+ default_v = "[\"" + column.default_value() + "\"]";
+ } else {
+ default_v = "[" + column.default_value() + "]";
+ }
+ RETURN_NOT_OK(ParseValueOfType(default_v, type, &value));
+ spec->Default(value);
+ }
+ if (column.has_comment()) {
+ spec->Comment(column.comment());
+ }
+ // If no valid encoding is provided, AUTO_ENCODING will be used by default.
+ if (column.has_encoding()) {
+ KuduColumnStorageAttributes::EncodingType type;
+ RETURN_NOT_OK(ToClientEncodingType(column.encoding(), &type));
+ spec->Encoding(type);
+ }
+ // If no valid compression is provided, DEFAULT_COMPRESSION will be used.
+ if (column.has_compression()) {
+ KuduColumnStorageAttributes::CompressionType type;
+ RETURN_NOT_OK(ToClientCompressionType(column.compression(), &type));
+ spec->Compression(type);
+ }
+ }
+
+ b.SetPrimaryKey(vector<string>(schema.key_column_names().begin(),
+ schema.key_column_names().end()));
+ RETURN_NOT_OK(b.Build(kudu_schema));
+ return Status::OK();
+}
+#pragma GCC diagnostic pop
+
+Status ParseTablePartition(const PartitionPB& partition,
+ const KuduSchema& kudu_schema,
+ KuduTableCreator* table_creator) {
+ for (const auto& hash_partition : partition.hash_partitions()) {
+ vector<string> hash_keys;
+ for (const auto& hk : hash_partition.columns()) {
+ hash_keys.push_back(hk);
+ }
+ int32_t seed = 0;
+ if (hash_partition.has_seed()) {
+ seed = hash_partition.seed();
+ }
+ table_creator->add_hash_partitions(hash_keys,
hash_partition.num_buckets(), seed);
+ }
+ // Generate and add the range partition splits for the table.
+ if (!partition.has_range_partition()) {
+ table_creator->set_range_partition_columns({});
+ return Status::OK();
+ }
+ set<string> range_keys;
+ vector<pair<string, KuduColumnSchema::DataType>> range_col_names_and_types;
+ for (const auto& range_key : partition.range_partition().columns()) {
+ range_keys.insert(range_key);
+ }
+ table_creator->set_range_partition_columns(
+ vector<string>(range_keys.begin(), range_keys.end()));
+ for (int idx = 0; idx < kudu_schema.num_columns(); ++idx) {
+ // Find the range key type,
+ KuduColumnSchema column = kudu_schema.Column(idx);
+ if (ContainsKey(range_keys, column.name())) {
+ range_col_names_and_types.emplace_back(
+ std::make_pair(column.name(), column.type()));
+ }
+ }
+ string bound_partial_row_json;
+ for (const auto& bound : partition.range_partition().range_bounds()) {
+ unique_ptr<KuduPartialRow> lower_bound(kudu_schema.NewRow());
+ unique_ptr<KuduPartialRow> upper_bound(kudu_schema.NewRow());
+ KuduTableCreator::RangePartitionBound lower_bound_type =
+ KuduTableCreator::INCLUSIVE_BOUND;
+ KuduTableCreator::RangePartitionBound upper_bound_type =
+ KuduTableCreator::EXCLUSIVE_BOUND;
+ if (bound.has_lower_bound()) {
+ RETURN_NOT_OK(ToJsonPartialRow(bound.lower_bound().bound_values(),
+ range_col_names_and_types,
+ &bound_partial_row_json));
+ RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
+ bound_partial_row_json,
+ lower_bound.get()));
+
RETURN_NOT_OK(ToClientRangePartitionBound(bound.lower_bound().bound_type(),
+ &lower_bound_type));
+ }
+ if (bound.has_upper_bound()) {
+ RETURN_NOT_OK(ToJsonPartialRow(bound.upper_bound().bound_values(),
+ range_col_names_and_types,
+ &bound_partial_row_json));
+ RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
+ bound_partial_row_json,
+ upper_bound.get()));
+
RETURN_NOT_OK(ToClientRangePartitionBound(bound.upper_bound().bound_type(),
+ &upper_bound_type));
+ }
+ table_creator->add_range_partition(lower_bound.release(),
upper_bound.release(),
+ lower_bound_type, upper_bound_type);
+ }
+ for (const auto& split_pb : partition.range_partition().range_splits()) {
+ RETURN_NOT_OK(ToJsonPartialRow(split_pb.split_values(),
+ range_col_names_and_types,
+ &bound_partial_row_json));
+ unique_ptr<KuduPartialRow> split(kudu_schema.NewRow());
+ RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
+ bound_partial_row_json,
+ split.get()));
+ table_creator->add_range_partition_split(split.release());
+ }
+ return Status::OK();
+}
+
+Status CreateTable(const RunnerContext& context) {
+ const string& json_str = FindOrDie(context.required_args,
kCreateTableJSONArg);
+ CreateTablePB table_req;
+ const auto& google_status =
+ google::protobuf::util::JsonStringToMessage(json_str, &table_req);
+ if (!google_status.ok()) {
+ return Status::InvalidArgument(
+ Substitute("unable to parse JSON: $0", json_str),
+ google_status.error_message().ToString());
+ }
+
+ client::sp::shared_ptr<KuduClient> client;
+ RETURN_NOT_OK(CreateKuduClient(context, &client));
+ KuduSchema kudu_schema;
+ RETURN_NOT_OK(ParseTableSchema(table_req.schema(), &kudu_schema));
+ unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+ table_creator->table_name(table_req.table_name())
+ .schema(&kudu_schema);
+ RETURN_NOT_OK(ParseTablePartition(table_req.partition(), kudu_schema,
table_creator.get()));
+ if (table_req.has_num_replicas()) {
+ table_creator->num_replicas(table_req.num_replicas());
+ }
+ if (table_req.has_extra_configs()) {
+ map<string, string>
extra_configs(table_req.extra_configs().configs().begin(),
+
table_req.extra_configs().configs().end());
+ table_creator->extra_configs(extra_configs);
+ }
+ if (table_req.has_dimension_label()) {
+ table_creator->dimension_label(table_req.dimension_label());
+ }
+ return table_creator->Create();
+}
+
+
} // anonymous namespace
unique_ptr<Mode> BuildTableMode() {
@@ -989,6 +1239,7 @@ unique_ptr<Mode> BuildTableMode() {
"Name of the table for which to get extra
configurations" })
.AddOptionalParameter("config_names")
.Build();
+
unique_ptr<Action> drop_range_partition =
ActionBuilder("drop_range_partition", &DropRangePartition)
.Description("Drop a range partition of table")
@@ -1077,6 +1328,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to
delete" })
.Build();
+
unique_ptr<Action> statistics =
ActionBuilder("statistics", &GetTableStatistics)
.Description("Get table statistics")
@@ -1084,26 +1336,48 @@ unique_ptr<Mode> BuildTableMode() {
.AddRequiredParameter({ kTableNameArg, "Name of the table to get
statistics" })
.Build();
+ unique_ptr<Action> create_table =
+ ActionBuilder("create", &CreateTable)
+ .Description("Create a new table")
+ .ExtraDescription("Provide the table-build statements as a JSON object,
e.g."
+
"'{\"table_name\":\"test\",\"schema\":{\"columns\":[{\"column_name"
+
"\":\"id\",\"column_type\":\"INT32\",\"default_value\":\"1\"},{"
+
"\"column_name\":\"key\",\"column_type\":\"INT64\",\"is_nullable\""
+ ":false,\"comment\":\"range
key\"},{\"column_name\":\"name\",\""
+
"column_type\":\"STRING\",\"is_nullable\":false,\"comment\":\""
+ "user name\"}],\"key_column_names\":[\"id\",
\"key\"]},\"partition\""
+
":{\"hash_partitions\":[{\"columns\":[\"id\"],\"num_buckets\":2,\"seed"
+
"\":100}],\"range_partition\":{\"columns\":[\"key\"],\"range_bounds\":"
+
"[{\"upper_bound\":{\"bound_type\":\"inclusive\",\"bound_values\":[\"2"
+ "\"]}},{\"lower_bound\":
{\"bound_type\":\"exclusive\",\"bound_values"
+ "\":
[\"2\"]},\"upper_bound\":{\"bound_type\":\"inclusive\",\""
+
"bound_values\":[\"3\"]}}]}},\"extra_configs\":{\"configs\":{\""
+
"kudu.table.history_max_age_sec\":\"3600\"}},\"num_replicas\":3}'.")
+ .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+ .AddRequiredParameter({ kCreateTableJSONArg, "JSON object for creating
table" })
+ .Build();
+
return ModeBuilder("table")
.Description("Operate on Kudu tables")
.AddAction(std::move(add_range_partition))
- .AddAction(std::move(column_set_default))
.AddAction(std::move(column_remove_default))
+ .AddAction(std::move(column_set_block_size))
.AddAction(std::move(column_set_compression))
+ .AddAction(std::move(column_set_default))
.AddAction(std::move(column_set_encoding))
- .AddAction(std::move(column_set_block_size))
+ .AddAction(std::move(copy_table))
+ .AddAction(std::move(create_table))
.AddAction(std::move(delete_column))
.AddAction(std::move(delete_table))
.AddAction(std::move(describe_table))
.AddAction(std::move(drop_range_partition))
+ .AddAction(std::move(get_extra_configs))
.AddAction(std::move(list_tables))
.AddAction(std::move(locate_row))
.AddAction(std::move(rename_column))
.AddAction(std::move(rename_table))
.AddAction(std::move(scan_table))
- .AddAction(std::move(copy_table))
.AddAction(std::move(set_extra_config))
- .AddAction(std::move(get_extra_configs))
.AddAction(std::move(statistics))
.Build();
}