This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 586b791  [client] KUDU-2671 custom hash buckets API for table creation
586b791 is described below

commit 586b7913258df2d0ee75470ddfb2b88d472ba235
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Jun 7 17:22:31 2021 -0700

    [client] KUDU-2671 custom hash buckets API for table creation
    
    This patch introduces changes in the Kudu C++ client API to make it
    possible to create a Kudu table with custom hash bucket schemas per
    range partition.
    
    This patch doesn't contain the rest of functionality required to make
    range partitions with custom hash bucket schemas fully functional. This
    is rather a patch focusing on the API side only.  The missing pieces will
    be addressed in follow-up patches:
    
      * update PartitionSchema to properly encode range keys in case
        where range partitions with custom hash bucket schemas are present
        (i.e. update PartitionSchema::EncodeKeyImpl() correspondingly)
    
      * update the meta-cache to work with partition range keys built for
        tables containing range partitions with custom hash bucket schema
    
      * update other places in client code which are dependent on
        PartitionPruner doing proper processing of partition key ranges for
        tables containing range partitions with custom hash bucket schema
    
      * add checks at the server side to check that the columns used for
        custom hash bucket schemas are part of the primary key
    
      * add provisions to allow for plain (i.e. without any hash
        sub-partitioning) custom range partitions for tables with
        table-wide hash bucket schema
    
      * add end-to-end tests to verify the proper distribution of inserted
        rows among range partitions and among their hash buckets
    
    I also added test coverage to verify the newly introduced functionality
    up to some extent, namely making sure the appropriate number of tablets
    is created for tables with custom hash bucket schemas per range, adding
    TODOs where full end-to-end coverage isn't yet available due to missing
    functionality outlined above.
    
    Change-Id: I98fd9754db850dcdd00a00738f470673f42ac5b4
    Reviewed-on: http://gerrit.cloudera.org:8080/17657
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/client/CMakeLists.txt                   |   1 +
 src/kudu/client/client-test.cc                   |   1 +
 src/kudu/client/client.cc                        | 123 +++++--
 src/kudu/client/client.h                         |  74 +++-
 src/kudu/client/flex_partitioning_client-test.cc | 420 +++++++++++++++++++++++
 src/kudu/client/meta_cache.cc                    |  16 +-
 src/kudu/client/partitioner-internal.cc          |   2 +-
 src/kudu/client/table_creator-internal.cc        |  44 ++-
 src/kudu/client/table_creator-internal.h         |  58 +++-
 src/kudu/common/partition.cc                     |   8 +
 src/kudu/master/master.proto                     |  17 +-
 11 files changed, 695 insertions(+), 69 deletions(-)

diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index 0681d75..bf23d6c 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -275,5 +275,6 @@ SET_KUDU_TEST_LINK_LIBS(
 ADD_KUDU_TEST(client-test NUM_SHARDS 8 PROCESSORS 2
                           DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(client-unittest)
+ADD_KUDU_TEST(flex_partitioning_client-test)
 ADD_KUDU_TEST(predicate-test NUM_SHARDS 4)
 ADD_KUDU_TEST(scan_token-test PROCESSORS 2)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index e6387c7..1313516 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -134,6 +134,7 @@ DECLARE_bool(allow_unsafe_replication_factor);
 DECLARE_bool(catalog_manager_support_live_row_count);
 DECLARE_bool(catalog_manager_support_on_disk_size);
 DECLARE_bool(client_use_unix_domain_sockets);
+DECLARE_bool(enable_per_range_hash_schemas);
 DECLARE_bool(enable_txn_system_client_init);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(location_mapping_by_uuid);
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index fc91ec2..280f17b 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -811,9 +811,10 @@ KuduTableCreator& 
KuduTableCreator::add_hash_partitions(const vector<string>& co
 }
 
 KuduTableCreator& KuduTableCreator::add_hash_partitions(const vector<string>& 
columns,
-                                                        int32_t num_buckets, 
int32_t seed) {
+                                                        int32_t num_buckets,
+                                                        int32_t seed) {
   PartitionSchemaPB::HashBucketSchemaPB* bucket_schema =
-    data_->partition_schema_.add_hash_bucket_schemas();
+      data_->partition_schema_.add_hash_bucket_schemas();
   for (const string& col_name : columns) {
     bucket_schema->add_columns()->set_name(col_name);
   }
@@ -855,16 +856,20 @@ KuduTableCreator& KuduTableCreator::split_rows(const 
vector<const KuduPartialRow
   return *this;
 }
 
-KuduTableCreator& KuduTableCreator::add_range_partition(KuduPartialRow* 
lower_bound,
-                                                        KuduPartialRow* 
upper_bound,
-                                                        RangePartitionBound 
lower_bound_type,
-                                                        RangePartitionBound 
upper_bound_type) {
-  data_->range_partition_bounds_.push_back({
-      unique_ptr<KuduPartialRow>(lower_bound),
-      unique_ptr<KuduPartialRow>(upper_bound),
-      lower_bound_type,
-      upper_bound_type,
-  });
+KuduTableCreator& KuduTableCreator::add_range_partition(
+    KuduPartialRow* lower_bound,
+    KuduPartialRow* upper_bound,
+    RangePartitionBound lower_bound_type,
+    RangePartitionBound upper_bound_type) {
+  data_->range_partitions_.emplace_back(new KuduRangePartition(
+      lower_bound, upper_bound, lower_bound_type, upper_bound_type));
+  return *this;
+}
+
+KuduTableCreator& KuduTableCreator::add_custom_range_partition(
+    KuduRangePartition* partition) {
+  CHECK(partition);
+  data_->range_partitions_.emplace_back(partition);
   return *this;
 }
 
@@ -909,7 +914,6 @@ Status KuduTableCreator::Create() {
 
   // Build request.
   CreateTableRequestPB req;
-  CreateTableResponsePB resp;
   req.set_name(data_->table_name_);
   if (data_->num_replicas_ != boost::none) {
     req.set_num_replicas(data_->num_replicas_.get());
@@ -932,31 +936,66 @@ Status KuduTableCreator::Create() {
                         "Invalid schema");
 
   RowOperationsPBEncoder encoder(req.mutable_split_rows_range_bounds());
-
+  bool has_range_splits = false;
   for (const auto& row : data_->range_partition_splits_) {
     if (!row) {
       return Status::InvalidArgument("range split row must not be null");
     }
     encoder.Add(RowOperationsPB::SPLIT_ROW, *row);
+    has_range_splits = true;
   }
 
-  for (const auto& bound : data_->range_partition_bounds_) {
-    if (!bound.lower_bound || !bound.upper_bound) {
+  // A preliminary pass over the ranges is here to check if any custom hash
+  // partitioning schemas are present.
+  bool has_range_with_custom_hash_schema = false;
+  for (const auto& p : data_->range_partitions_) {
+    if (!p->data_->hash_bucket_schemas_.empty()) {
+      has_range_with_custom_hash_schema = true;
+      break;
+    }
+  }
+
+  if (has_range_splits && has_range_with_custom_hash_schema) {
+    // For simplicity, don't allow having both range splits (deprecated) and
+    // custom hash bucket schemas per range partition.
+    return Status::InvalidArgument(
+        "split rows and custom hash bucket schemas for ranges are 
incompatible: "
+        "choose one or the other");
+  }
+
+  for (const auto& p : data_->range_partitions_) {
+    const auto* range = p->data_;
+    if (!range->lower_bound_ || !range->upper_bound_) {
       return Status::InvalidArgument("range bounds must not be null");
     }
 
     RowOperationsPB_Type lower_bound_type =
-      bound.lower_bound_type == KuduTableCreator::INCLUSIVE_BOUND ?
-      RowOperationsPB::RANGE_LOWER_BOUND :
-      RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND;
+        range->lower_bound_type_ == KuduTableCreator::INCLUSIVE_BOUND
+        ? RowOperationsPB::RANGE_LOWER_BOUND
+        : RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND;
 
     RowOperationsPB_Type upper_bound_type =
-      bound.upper_bound_type == KuduTableCreator::EXCLUSIVE_BOUND ?
-      RowOperationsPB::RANGE_UPPER_BOUND :
-      RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND;
-
-    encoder.Add(lower_bound_type, *bound.lower_bound);
-    encoder.Add(upper_bound_type, *bound.upper_bound);
+        range->upper_bound_type_ == KuduTableCreator::EXCLUSIVE_BOUND
+        ? RowOperationsPB::RANGE_UPPER_BOUND
+        : RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND;
+
+    encoder.Add(lower_bound_type, *range->lower_bound_);
+    encoder.Add(upper_bound_type, *range->upper_bound_);
+
+    if (has_range_with_custom_hash_schema) {
+      // Populate corresponding element in 'range_hash_schemas' if there is at
+      // least one range with custom hash partitioning schema.
+      auto* schemas_pb = req.add_range_hash_schemas();
+      for (const auto& schema : range->hash_bucket_schemas_) {
+        auto* pb = schemas_pb->add_hash_schemas();
+        pb->set_seed(schema.seed);
+        pb->set_num_buckets(schema.num_buckets);
+        for (const auto& column_name : schema.column_names) {
+          auto* column_id = pb->add_columns();
+          column_id->set_name(column_name);
+        }
+      }
+    }
   }
 
   req.mutable_partition_schema()->CopyFrom(data_->partition_schema_);
@@ -971,26 +1010,42 @@ Status KuduTableCreator::Create() {
   } else {
     deadline += data_->client_->default_admin_operation_timeout();
   }
-  RETURN_NOT_OK_PREPEND(data_->client_->data_->CreateTable(data_->client_,
-                                                           req,
-                                                           &resp,
-                                                           deadline,
-                                                           
!data_->range_partition_bounds_.empty()),
+
+  CreateTableResponsePB resp;
+  RETURN_NOT_OK_PREPEND(data_->client_->data_->CreateTable(
+      data_->client_, req, &resp, deadline, !data_->range_partitions_.empty()),
                         Substitute("Error creating table $0 on the master",
                                    data_->table_name_));
-
   // Spin until the table is fully created, if requested.
   if (data_->wait_) {
     TableIdentifierPB table;
     table.set_table_id(resp.table_id());
-    
RETURN_NOT_OK(data_->client_->data_->WaitForCreateTableToFinish(data_->client_,
-                                                                    table,
-                                                                    deadline));
+    RETURN_NOT_OK(data_->client_->data_->WaitForCreateTableToFinish(
+        data_->client_, table, deadline));
   }
 
   return Status::OK();
 }
 
+KuduTableCreator::KuduRangePartition::KuduRangePartition(
+    KuduPartialRow* lower_bound,
+    KuduPartialRow* upper_bound,
+    RangePartitionBound lower_bound_type,
+    RangePartitionBound upper_bound_type)
+    : data_(new Data(lower_bound, upper_bound, lower_bound_type, 
upper_bound_type)) {
+}
+
+KuduTableCreator::KuduRangePartition::~KuduRangePartition() {
+  delete data_;
+}
+
+Status KuduTableCreator::KuduRangePartition::add_hash_partitions(
+    const vector<string>& columns,
+    int32_t num_buckets,
+    int32_t seed) {
+  return data_->add_hash_partitions(columns, num_buckets, seed);
+}
+
 ////////////////////////////////////////////////////////////
 // KuduTableStatistics
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index da8c26f..3ff9141 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1145,7 +1145,8 @@ class KUDU_EXPORT KuduTableCreator {
   ///   Hash: seed for mapping rows to hash buckets.
   /// @return Reference to the modified table creator.
   KuduTableCreator& add_hash_partitions(const std::vector<std::string>& 
columns,
-                                        int32_t num_buckets, int32_t seed);
+                                        int32_t num_buckets,
+                                        int32_t seed);
 
   /// Set the columns on which the table will be range-partitioned.
   ///
@@ -1167,6 +1168,63 @@ class KUDU_EXPORT KuduTableCreator {
     INCLUSIVE_BOUND, ///< An inclusive bound.
   };
 
+  /// A helper class to represent a Kudu range partition with a custom hash
+  /// bucket schema. The hash sub-partitioning for a range partition might be
+  /// different from the default table-wide hash bucket schema specified during
+  /// a table's creation (see KuduTableCreator::add_hash_partitions()).
+  /// Correspondingly, this class provides a means to specify a custom hash
+  /// bucket structure for the data in a range partition.
+  class KuduRangePartition {
+   public:
+    /// Create an object representing the range defined by the given 
parameters.
+    ///
+    /// @param [in] lower_bound
+    ///   The lower bound for the range.
+    ///   The KuduRangePartition object takes ownership of the parameter.
+    /// @param [in] upper_bound
+    ///   The upper bound for the range.
+    ///   The KuduRangePartition object takes ownership of the parameter.
+    /// @param [in] lower_bound_type
+    ///   The type of the lower_bound: inclusive or exclusive; inclusive if the
+    ///   parameter is omitted.
+    /// @param [in] upper_bound_type
+    ///   The type of the upper_bound: inclusive or exclusive; exclusive if the
+    ///   parameter is omitted.
+    KuduRangePartition(KuduPartialRow* lower_bound,
+                       KuduPartialRow* upper_bound,
+                       RangePartitionBound lower_bound_type = INCLUSIVE_BOUND,
+                       RangePartitionBound upper_bound_type = EXCLUSIVE_BOUND);
+
+    ~KuduRangePartition();
+
+    /// Add an extra level of hash partitioning for this range partition.
+    ///
+    /// The newly added hash partitioning level is defined by its hash bucket
+    /// schema. The hash bucket schema is specified by the parameters of this
+    /// method. A range partition can have multiple levels of hash 
partitioning,
+    /// i.e. this method can be called multiple times to establish a
+    /// multi-dimensional hash bucket structure for the range partition.
+    ///
+    /// @param [in] columns
+    ///   Names of columns to use for partitioning.
+    /// @param [in] num_buckets
+    ///   Number of buckets for the hashing.
+    /// @param [in] seed
+    ///   Hash seed for mapping rows to hash buckets.
+    /// @return Operation result status.
+    Status add_hash_partitions(const std::vector<std::string>& columns,
+                               int32_t num_buckets,
+                               int32_t seed = 0);
+   private:
+    class KUDU_NO_EXPORT Data;
+    friend class KuduTableCreator;
+
+    // Owned.
+    Data* data_;
+
+    DISALLOW_COPY_AND_ASSIGN(KuduRangePartition);
+  };
+
   /// Add a range partition to the table.
   ///
   /// Multiple range partitions may be added, but they must not overlap. All
@@ -1200,6 +1258,20 @@ class KUDU_EXPORT KuduTableCreator {
                                         RangePartitionBound lower_bound_type = 
INCLUSIVE_BOUND,
                                         RangePartitionBound upper_bound_type = 
EXCLUSIVE_BOUND);
 
+  /// Add a range partition with a custom hash bucket schema.
+  ///
+  /// This method allows adding a range partition which has hash partitioning
+  /// schema different from the schema used for a range partition.
+  ///
+  /// @warning This functionality isn't fully implemented yet.
+  ///
+  /// @param [in] partition
+  ///   Range partition with custom hash bucket schema.
+  ///   The KuduTableCreator object takes ownership of the parameter.
+  /// @return Reference to the modified table creator.
+  KuduTableCreator& add_custom_range_partition(
+      KuduRangePartition* partition);
+
   /// Add a range partition split at the provided row.
   ///
   /// @param [in] split_row
diff --git a/src/kudu/client/flex_partitioning_client-test.cc 
b/src/kudu/client/flex_partitioning_client-test.cc
new file mode 100644
index 0000000..35e9c52
--- /dev/null
+++ b/src/kudu/client/flex_partitioning_client-test.cc
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(enable_per_range_hash_schemas);
+DECLARE_int32(heartbeat_interval_ms);
+
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::master::CatalogManager;
+using kudu::client::sp::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace client {
+
+class FlexPartitioningTest : public KuduTest {
+ public:
+  static constexpr const char* const kKeyColumn = "key";
+  static constexpr const char* const kIntValColumn = "int_val";
+  static constexpr const char* const kStringValColumn = "string_val";
+
+  FlexPartitioningTest() {
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->
+        Type(KuduColumnSchema::INT32)->
+        NotNull()->
+        PrimaryKey();
+    b.AddColumn("int_val")->
+        Type(KuduColumnSchema::INT32)->
+        NotNull();
+    b.AddColumn("string_val")->
+        Type(KuduColumnSchema::STRING)->
+        Nullable();
+    CHECK_OK(b.Build(&schema_));
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    // Reduce the TS<->Master heartbeat interval to speed up testing.
+    FLAGS_heartbeat_interval_ms = 10;
+
+    // Explicitly enable support for custom hash schemas per range partition.
+    FLAGS_enable_per_range_hash_schemas = true;
+
+    // Start minicluster and wait for tablet servers to connect to master.
+    cluster_.reset(new InternalMiniCluster(env_, 
InternalMiniClusterOptions()));
+    ASSERT_OK(cluster_->Start());
+
+    // Connect to the cluster.
+    ASSERT_OK(KuduClientBuilder()
+        .add_master_server_addr(
+            cluster_->mini_master()->bound_rpc_addr().ToString())
+        .Build(&client_));
+  }
+
+ protected:
+  typedef unique_ptr<KuduTableCreator::KuduRangePartition> RangePartition;
+  typedef vector<RangePartition> RangePartitions;
+
+  static Status ApplyInsert(KuduSession* session,
+                            const shared_ptr<KuduTable>& table,
+                            int32_t key_val,
+                            int32_t int_val,
+                            string string_val) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    RETURN_NOT_OK(insert->mutable_row()->SetInt32(kKeyColumn, key_val));
+    RETURN_NOT_OK(insert->mutable_row()->SetInt32(kIntValColumn, int_val));
+    RETURN_NOT_OK(insert->mutable_row()->SetStringCopy(
+        kStringValColumn, std::move(string_val)));
+    return session->Apply(insert.release());
+  }
+
+  Status InsertTestRows(
+      const char* table_name,
+      int32_t key_beg,
+      int32_t key_end,
+      KuduSession::FlushMode flush_mode = KuduSession::AUTO_FLUSH_SYNC) {
+    CHECK_LE(key_beg, key_end);
+    shared_ptr<KuduTable> table;
+    RETURN_NOT_OK(client_->OpenTable(table_name, &table));
+    shared_ptr<KuduSession> session = client_->NewSession();
+    RETURN_NOT_OK(session->SetFlushMode(flush_mode));
+    session->SetTimeoutMillis(60000);
+    for (int32_t key_val = key_beg; key_val < key_end; ++key_val) {
+      RETURN_NOT_OK(ApplyInsert(
+          session.get(), table, key_val, rand(), std::to_string(rand())));
+    }
+    return session->Flush();
+  }
+
+  Status CreateTable(const char* table_name, RangePartitions partitions) {
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    table_creator->table_name(table_name)
+        .schema(&schema_)
+        .num_replicas(1)
+        .set_range_partition_columns({ kKeyColumn });
+
+    for (auto& p : partitions) {
+      table_creator->add_custom_range_partition(p.release());
+    }
+
+    return table_creator->Create();
+  }
+
+  RangePartition CreateRangePartition(int32_t lower_boundary = 0,
+                                      int32_t upper_boundary = 100) {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    CHECK_OK(lower->SetInt32(kKeyColumn, lower_boundary));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    CHECK_OK(upper->SetInt32(kKeyColumn, upper_boundary));
+    return unique_ptr<KuduTableCreator::KuduRangePartition>(
+        new KuduTableCreator::KuduRangePartition(lower.release(),
+                                                 upper.release()));
+  }
+
+  void CheckTabletCount(const char* table_name, int expected_count) {
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client_->OpenTable(table_name, &table));
+
+    scoped_refptr<master::TableInfo> table_info;
+    {
+      auto* cm = cluster_->mini_master(0)->master()->catalog_manager();
+      CatalogManager::ScopedLeaderSharedLock l(cm);
+      ASSERT_OK(cm->GetTableInfo(table->id(), &table_info));
+    }
+    ASSERT_EQ(expected_count, table_info->num_tablets());
+  }
+
+  KuduSchema schema_;
+  unique_ptr<InternalMiniCluster> cluster_;
+  shared_ptr<KuduClient> client_;
+};
+
+// Test for scenarios covering range partitioning with custom bucket schemas
+// specified when creating a table.
+class FlexPartitioningCreateTableTest : public FlexPartitioningTest {};
+
+// Create tables with range partitions using custom hash bucket schemas only.
+//
+// TODO(aserbin): turn the sub-scenarios with non-primary-key columns for
+//                custom hash buckets into negative ones after proper
+//                checks are added at the server side
+// TODO(aserbin): add verification based on PartitionSchema provided by
+//                KuduTable::partition_schema() once PartitionPruner
+//                recognized custom hash bucket schema for ranges
+// TODO(aserbin): add InsertTestRows() when proper key encoding is implemented
+TEST_F(FlexPartitioningCreateTableTest, CustomHashBuckets) {
+  // One-level hash bucket structure: { 3, "key" }.
+  {
+    constexpr const char* const kTableName = "3@key";
+    RangePartitions partitions;
+    partitions.emplace_back(CreateRangePartition());
+    auto& p = partitions.back();
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
+    ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
+    NO_FATALS(CheckTabletCount(kTableName, 3));
+  }
+
+  // One-level hash bucket structure with hashing on non-key column only:
+  // { 3, "int_val" }.
+  {
+    constexpr const char* const kTableName = "3@int_val";
+    RangePartitions partitions;
+    partitions.emplace_back(CreateRangePartition());
+    auto& p = partitions.back();
+    ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 2, 0));
+    ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
+    NO_FATALS(CheckTabletCount(kTableName, 2));
+  }
+
+  // One-level hash bucket structure with hashing on non-key nullable column:
+  // { 5, "string_val" }.
+  {
+    constexpr const char* const kTableName = "3@string_val";
+    RangePartitions partitions;
+    partitions.emplace_back(CreateRangePartition());
+    auto& p = partitions.back();
+    ASSERT_OK(p->add_hash_partitions({ kStringValColumn }, 5, 0));
+    ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
+    NO_FATALS(CheckTabletCount(kTableName, 5));
+  }
+
+  // Two-level hash bucket structure: { 3, "key" } x { 3, "key" }.
+  {
+    constexpr const char* const kTableName = "3@key_x_3@key";
+    RangePartitions partitions;
+    partitions.emplace_back(CreateRangePartition());
+    auto& p = partitions.back();
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
+    ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
+    NO_FATALS(CheckTabletCount(kTableName, 9));
+  }
+
+  // Two-level hash bucket structure: { 2, "key" } x { 3, "int_val" }.
+  {
+    constexpr const char* const kTableName = "2@key_x_3@int_val";
+    RangePartitions partitions;
+    partitions.emplace_back(CreateRangePartition());
+    auto& p = partitions.back();
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
+    ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 3, 1));
+    ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
+    NO_FATALS(CheckTabletCount(kTableName, 6));
+  }
+
+  // Two-level hash bucket structure: { 3, "key" } x { 2, "key", "int_val" }.
+  {
+    constexpr const char* const kTableName = "3@key_x_2@key:int_val";
+    RangePartitions partitions;
+    partitions.emplace_back(CreateRangePartition());
+    auto& p = partitions.back();
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 0));
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn, kIntValColumn }, 2, 1));
+    ASSERT_OK(CreateTable(kTableName, std::move(partitions)));
+    NO_FATALS(CheckTabletCount(kTableName, 6));
+  }
+}
+
+// Create a table with mixed set of range partitions, using both table-wide and
+// custom hash bucket schemas.
+//
+// TODO(aserbin): add verification based on PartitionSchema provided by
+//                KuduTable::partition_schema() once PartitionPruner
+//                recognized custom hash bucket schema for ranges
+// TODO(aserbin): add InsertTestRows() when proper key encoding is implemented
+TEST_F(FlexPartitioningCreateTableTest, DefaultAndCustomHashBuckets) {
+  // Create a table with the following partitions:
+  //
+  //            hash bucket
+  //   key    0               1               2               3
+  //         --------------------------------------------------------------
+  //  <111    x:{key}         x:{key}         -               -
+  // 111-222  x:{key}         x:{key}         x:{key}         -
+  // 222-333  x:{int_val}     x:{int_val}     x:{int_val}     x:{int_val}
+  // 333-444  x:{key,int_val} x:{key,int_val} -               -
+  constexpr const char* const kTableName = "DefaultAndCustomHashBuckets";
+
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+      .schema(&schema_)
+      .num_replicas(1)
+      .add_hash_partitions({ kKeyColumn }, 2)
+      .set_range_partition_columns({ kKeyColumn });
+
+  // Add a range partition with the table-wide hash partitioning rules.
+  {
+    unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+    ASSERT_OK(lower->SetInt32(kKeyColumn, INT32_MIN));
+    unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+    ASSERT_OK(upper->SetInt32(kKeyColumn, 111));
+    table_creator->add_range_partition(lower.release(), upper.release());
+  }
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 3 buckets with hash based on the "key" column with hash seed 1.
+  {
+    auto p = CreateRangePartition(111, 222);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 3, 1));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 4 buckets with hash based on the "int_val" column with hash seed 2.
+  {
+    auto p = CreateRangePartition(222, 333);
+    ASSERT_OK(p->add_hash_partitions({ kIntValColumn }, 4, 2));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  // Add a range partition with custom hash sub-partitioning rules:
+  // 3 buckets hashing on the { "key", "int_val" } columns with hash seed 3.
+  {
+    auto p = CreateRangePartition(333, 444);
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn, kIntValColumn }, 2, 3));
+    table_creator->add_custom_range_partition(p.release());
+  }
+
+  ASSERT_OK(table_creator->Create());
+  NO_FATALS(CheckTabletCount(kTableName, 11));
+
+  // Make sure it's possible to insert rows into the table.
+  //ASSERT_OK(InsertTestRows(kTableName, 111, 444));
+}
+
+// Negative tests scenarios to cover non-OK status codes for various operations
+// related to custom hash bucket schema per range.
+TEST_F(FlexPartitioningCreateTableTest, Negatives) {
+  // Try adding hash partitions on an empty set of columns.
+  {
+    auto p = CreateRangePartition();
+    const auto s = p->add_hash_partitions({}, 2, 0);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(
+        s.ToString(), "set of columns for hash partitioning must not be 
empty");
+  }
+
+  // Try adding hash partitions with just one bucket.
+  {
+    auto p = CreateRangePartition();
+    const auto s = p->add_hash_partitions({ kKeyColumn }, 1, 0);
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(
+        s.ToString(),
+        "at least two buckets are required to establish hash partitioning");
+  }
+
+  // Try adding hash partition on a non-existent column: appropriate error
+  // surfaces during table creation.
+  {
+    RangePartitions partitions;
+    partitions.emplace_back(CreateRangePartition());
+    auto& p = partitions.back();
+    ASSERT_OK(p->add_hash_partitions({ "nonexistent" }, 2, 0));
+    const auto s = CreateTable("nicetry", std::move(partitions));
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "unknown column: name: \"nonexistent\"");
+  }
+
+  // Try adding creating a table where both range splits and custom hash bucket
+  // schema per partition are both specified -- that should not be possible.
+  {
+    RangePartition p(CreateRangePartition(0, 100));
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
+
+    unique_ptr<KuduPartialRow> split(schema_.NewRow());
+    ASSERT_OK(split->SetInt32(kKeyColumn, 50));
+
+    unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
+    creator->table_name("nicetry")
+        .schema(&schema_)
+        .num_replicas(1)
+        .set_range_partition_columns({ kKeyColumn })
+        .add_range_partition_split(split.release())
+        .add_custom_range_partition(p.release());
+
+    const auto s = creator->Create();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(
+        s.ToString(),
+        "split rows and custom hash bucket schemas for ranges are 
incompatible: "
+        "choose one or the other");
+  }
+
+  // Same as the sub-scenario above, but using deprecated client API to specify
+  // so-called split rows.
+  {
+    RangePartition p(CreateRangePartition(0, 100));
+    ASSERT_OK(p->add_hash_partitions({ kKeyColumn }, 2, 0));
+
+    unique_ptr<KuduPartialRow> split_row(schema_.NewRow());
+    ASSERT_OK(split_row->SetInt32(kKeyColumn, 50));
+
+    unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
+    creator->table_name("nicetry")
+        .schema(&schema_)
+        .num_replicas(1)
+        .set_range_partition_columns({ kKeyColumn })
+        .add_custom_range_partition(p.release());
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+    creator->split_rows({ split_row.release() });
+#pragma GCC diagnostic pop
+
+    const auto s = creator->Create();
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(
+        s.ToString(),
+        "split rows and custom hash bucket schemas for ranges are 
incompatible: "
+        "choose one or the other");
+  }
+}
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index fdbfe08..70fdc36 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -950,18 +950,16 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& 
rpc,
   VLOG(2) << "Processing master response for " << rpc.ToString()
           << ". Response: " << pb_util::SecureShortDebugString(rpc.resp());
 
-  if (rpc.resp().tablet_locations().empty()) {
-    // If there are no tablets in the response, then the table is empty. If
-    // there were any tablets in the table they would have been returned, since
-    // the master guarantees that if the partition key falls in a non-covered
-    // range, the previous tablet will be returned, and we did not set an upper
-    // bound partition key on the request.
-    DCHECK(!rpc.req().has_partition_key_end());
-  }
+  // If there are no tablets in the response, then the table is empty. If
+  // there were any tablets in the table they would have been returned, since
+  // the master guarantees that if the partition key falls in a non-covered
+  // range, the previous tablet will be returned, and we did not set an upper
+  // bound partition key on the request.
+  DCHECK(!rpc.resp().tablet_locations().empty() ||
+         !rpc.req().has_partition_key_end());
 
   return ProcessGetTableLocationsResponse(rpc.table(), rpc.partition_key(), 
rpc.is_exact_lookup(),
       rpc.resp(), cache_entry, max_returned_locations);
-
 }
 
 Status MetaCache::ProcessGetTabletLocationsResponse(const string& tablet_id,
diff --git a/src/kudu/client/partitioner-internal.cc 
b/src/kudu/client/partitioner-internal.cc
index 9b89d2e..09ba399 100644
--- a/src/kudu/client/partitioner-internal.cc
+++ b/src/kudu/client/partitioner-internal.cc
@@ -39,7 +39,7 @@ namespace client {
 Status KuduPartitionerBuilder::Data::Build(KuduPartitioner** partitioner) {
   // If any of the builder calls had generated a bad status, then return it 
here.
   RETURN_NOT_OK(status_);
-  unique_ptr<KuduPartitioner::Data> ret_data(new KuduPartitioner::Data());
+  unique_ptr<KuduPartitioner::Data> ret_data(new KuduPartitioner::Data);
 
   auto deadline = MonoTime::Now() + timeout_;
   auto mc = table_->client()->data_->meta_cache_;
diff --git a/src/kudu/client/table_creator-internal.cc 
b/src/kudu/client/table_creator-internal.cc
index 995fb34..7bfbddd 100644
--- a/src/kudu/client/table_creator-internal.cc
+++ b/src/kudu/client/table_creator-internal.cc
@@ -17,17 +17,51 @@
 
 #include "kudu/client/table_creator-internal.h"
 
-namespace kudu {
+#include <string>
+#include <vector>
+
+using std::string;
+using std::vector;
 
+namespace kudu {
 namespace client {
 
 KuduTableCreator::Data::Data(KuduClient* client)
-  : client_(client),
-    schema_(nullptr),
-    wait_(true) {
+    : client_(client),
+      schema_(nullptr),
+      wait_(true) {
 }
 
-KuduTableCreator::Data::~Data() {
+KuduTableCreator::KuduRangePartition::Data::Data(
+    KuduPartialRow* lower_bound,
+    KuduPartialRow* upper_bound,
+    RangePartitionBound lower_bound_type,
+    RangePartitionBound upper_bound_type)
+    : lower_bound_type_(lower_bound_type),
+      upper_bound_type_(upper_bound_type),
+      lower_bound_(lower_bound),
+      upper_bound_(upper_bound) {
+}
+
+Status KuduTableCreator::KuduRangePartition::Data::add_hash_partitions(
+    const vector<string>& column_names,
+    int32_t num_buckets,
+    int32_t seed) {
+  if (column_names.empty()) {
+    return Status::InvalidArgument(
+        "set of columns for hash partitioning must not be empty");
+  }
+  if (num_buckets <= 1) {
+    return Status::InvalidArgument(
+        "at least two buckets are required to establish hash partitioning");
+  }
+
+  // It's totally fine to have multiple hash levels with same parameters,
+  // so there is no need to check for logical duplicates in the
+  // 'hash_bucket_schemas_' vector.
+  hash_bucket_schemas_.emplace_back(column_names, num_buckets, seed);
+
+  return Status::OK();
 }
 
 } // namespace client
diff --git a/src/kudu/client/table_creator-internal.h 
b/src/kudu/client/table_creator-internal.h
index ec75a29..d7fcb2f 100644
--- a/src/kudu/client/table_creator-internal.h
+++ b/src/kudu/client/table_creator-internal.h
@@ -14,12 +14,13 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_CLIENT_TABLE_CREATOR_INTERNAL_H
-#define KUDU_CLIENT_TABLE_CREATOR_INTERNAL_H
+#pragma once
 
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -29,17 +30,31 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
 
 namespace kudu {
-
 namespace client {
 
 class KuduSchema;
 
+struct HashBucketSchema {
+  HashBucketSchema(std::vector<std::string> column_names,
+                   uint32_t num_buckets,
+                   int32_t seed)
+      : column_names(std::move(column_names)),
+        num_buckets(num_buckets),
+        seed(seed) {
+  }
+
+  const std::vector<std::string> column_names;
+  const uint32_t num_buckets;
+  const int32_t seed;
+};
+
 class KuduTableCreator::Data {
  public:
   explicit Data(KuduClient* client);
-  ~Data();
+  ~Data() = default;
 
   KuduClient* client_;
 
@@ -49,14 +64,7 @@ class KuduTableCreator::Data {
 
   std::vector<std::unique_ptr<KuduPartialRow>> range_partition_splits_;
 
-  struct RangePartition {
-    std::unique_ptr<KuduPartialRow> lower_bound;
-    std::unique_ptr<KuduPartialRow> upper_bound;
-    RangePartitionBound lower_bound_type;
-    RangePartitionBound upper_bound_type;
-  };
-
-  std::vector<RangePartition> range_partition_bounds_;
+  std::vector<std::unique_ptr<KuduRangePartition>> range_partitions_;
 
   PartitionSchemaPB partition_schema_;
 
@@ -80,7 +88,29 @@ class KuduTableCreator::Data {
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
 
+class KuduTableCreator::KuduRangePartition::Data {
+ public:
+  Data(KuduPartialRow* lower_bound,
+       KuduPartialRow* upper_bound,
+       RangePartitionBound lower_bound_type,
+       RangePartitionBound upper_bound_type);
+  ~Data() = default;
+
+  Status add_hash_partitions(const std::vector<std::string>& column_names,
+                             int32_t num_buckets,
+                             int32_t seed);
+
+  const RangePartitionBound lower_bound_type_;
+  const RangePartitionBound upper_bound_type_;
+
+  std::unique_ptr<KuduPartialRow> lower_bound_;
+  std::unique_ptr<KuduPartialRow> upper_bound_;
+
+  std::vector<HashBucketSchema> hash_bucket_schemas_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Data);
+};
+
 } // namespace client
 } // namespace kudu
-
-#endif
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index d8f578d..ed6f56f 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -21,6 +21,7 @@
 #include <cstring>
 #include <iterator>
 #include <memory>
+#include <ostream>
 #include <set>
 #include <string>
 #include <unordered_set>
@@ -306,6 +307,9 @@ Status PartitionSchema::ToPB(const Schema& schema, 
PartitionSchemaPB* pb) const
 
 template<typename Row>
 Status PartitionSchema::EncodeKeyImpl(const Row& row, string* buf) const {
+  // TODO(aserbin): update the implementation and remove the DCHECK() below
+  DCHECK(ranges_with_hash_schemas_.empty())
+      << "ranges with custom hash schemas are not yet supported";
   const KeyEncoder<string>& hash_encoder = 
GetKeyEncoder<string>(GetTypeInfo(UINT32));
 
   for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
@@ -892,6 +896,10 @@ string PartitionSchema::PartitionKeyDebugString(const 
KuduPartialRow& row) const
 }
 
 string PartitionSchema::PartitionKeyDebugString(Slice key, const Schema& 
schema) const {
+  // TODO(aserbin): update the implementation and remove the DCHECK() below
+  DCHECK(ranges_with_hash_schemas_.empty())
+      << "ranges with custom hash schemas are not yet supported";
+
   vector<string> components;
 
   size_t hash_components_size = kEncodedBucketSize * 
hash_bucket_schemas_.size();
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index b5011ca..540cfd4 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -509,17 +509,24 @@ message CreateTableRequestPB {
   required SchemaPB schema = 2;
   // repeated bytes pre_split_keys = 3;
   // repeated PartialRowPB split_rows = 5;
+
   // Holds either the split rows or the range bounds (or both) of the table.
   optional RowOperationsPB split_rows_range_bounds = 6;
+
   // Holds the table's partition schema, the partition schema's hash bucket 
schemas
   // are the default for any range where 'range_hash_schemas' is empty.
   optional PartitionSchemaPB partition_schema = 7;
-  // Holds the hash bucket schemas for each range during table creation.
-  // Only populated when 'split_rows_range_bounds' specifies range bounds, 
must be empty if any
-  // split rows are specified. If this field is set, its size must match the 
number of ranges
-  // specified by range bounds and they must be in the same order. If this 
field is empty,
-  // 'partition_schema' is assumed for every range bound.
+
+  // Holds the hash bucket schemas for each range during table creation. Only
+  // populated when 'split_rows_range_bounds' specifies range bounds, must not
+  // be present if any split rows are specified. If this field is present, its
+  // size must match the number of ranges specified by range bounds and they
+  // must be in the same order. If this field is absent, 'partition_schema' is
+  // assumed for every range bound.
   repeated PartitionSchemaPB.PerRangeHashBucketSchemasPB range_hash_schemas = 
12;
+
+  // Number of replicas for a partition/tablet, a.k.a. table's replication
+  // factor. All tablets of the same table has same replication factor.
   optional int32 num_replicas = 4;
 
   // If set, uses the provided value as the table owner when creating the 
table.

Reply via email to