Repository: kudu Updated Branches: refs/heads/master d5ac00c79 -> ee8592a1f
KUDU-1713: add a client Partitioner API This adds an API which a client can use to determine which partition a row falls into without actually writing that row. This is intended to be used by Impala when writing into Kudu: Impala can pre-shuffle and pre-sort the data being written so as to present a more efficient write pattern to the Kudu tablets. The corresponding Impala patch to use this API is at: https://gerrit.cloudera.org/#/c/6559/ Change-Id: Ic08a078c75f15ef4200219b5260cfb99d79b72cc Reviewed-on: http://gerrit.cloudera.org:8080/5775 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c47730a3 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c47730a3 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c47730a3 Branch: refs/heads/master Commit: c47730a377aef550f2d7a8ee1ded4bd5a2d40245 Parents: d5ac00c Author: Todd Lipcon <[email protected]> Authored: Mon Jan 23 18:32:41 2017 -0800 Committer: Todd Lipcon <[email protected]> Committed: Tue Apr 11 00:40:03 2017 +0000 ---------------------------------------------------------------------- src/kudu/client/CMakeLists.txt | 1 + src/kudu/client/client-test.cc | 110 +++++++++++++++++++++++++++ src/kudu/client/client.cc | 40 ++++++++++ src/kudu/client/client.h | 82 ++++++++++++++++++++ src/kudu/client/partitioner-internal.cc | 85 +++++++++++++++++++++ src/kudu/client/partitioner-internal.h | 73 ++++++++++++++++++ 6 files changed, 391 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt index 5d38a81..cdc73d9 100644 --- a/src/kudu/client/CMakeLists.txt +++ b/src/kudu/client/CMakeLists.txt @@ -37,6 +37,7 @@ set(CLIENT_SRCS error-internal.cc master_rpc.cc meta_cache.cc + partitioner-internal.cc scan_batch.cc scan_configuration.cc scan_predicate.cc http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/client-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 7c2a03f..2159090 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -4870,5 +4870,115 @@ INSTANTIATE_TEST_CASE_P( , ServiceUnavailableRetryClientTest, ::testing::ValuesIn(service_unavailable_retry_cases)); +TEST_F(ClientTest, TestPartitioner) { + // Create a table with the following 9 partitions: + // + // hash bucket + // key 0 1 2 + // ----------------- + // <3000 x x x + // 3000-7000 x x x + // >=7000 x x x + int num_ranges = 3; + const int kNumHashPartitions = 3; + const char* kTableName = "TestPartitioner"; + + vector<unique_ptr<KuduPartialRow>> split_rows; + for (int32_t split : {3333, 6666}) { + unique_ptr<KuduPartialRow> row(schema_.NewRow()); + ASSERT_OK(row->SetInt32("key", split)); + split_rows.emplace_back(std::move(row)); + } + + shared_ptr<KuduTable> table; + gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + table_creator->table_name(kTableName) + .schema(&schema_) + .num_replicas(1) + .add_hash_partitions({ "key" }, kNumHashPartitions) + .set_range_partition_columns({ "key" }); + + for (const auto& row : split_rows) { + table_creator->add_range_partition_split(new KuduPartialRow(*row)); + } + ASSERT_OK(table_creator->Create()); + ASSERT_OK(client_->OpenTable(kTableName, &table)); + + // Build a partitioner on the table. + unique_ptr<KuduPartitioner> part; + { + KuduPartitioner* part_raw; + ASSERT_OK(KuduPartitionerBuilder(table) + .Build(&part_raw)); + part.reset(part_raw); + } + + ASSERT_EQ(num_ranges * kNumHashPartitions, part->NumPartitions()); + + // Partition a bunch of rows, counting how many fall into each partition. + unique_ptr<KuduPartialRow> row(table->schema().NewRow()); + vector<int> counts_by_partition(part->NumPartitions()); + const int kNumRowsToPartition = 10000; + for (int i = 0; i < kNumRowsToPartition; i++) { + ASSERT_OK(row->SetInt32(0, i)); + int part_index; + ASSERT_OK(part->PartitionRow(*row, &part_index)); + counts_by_partition.at(part_index)++; + } + + // We don't expect a completely even division of rows into partitions, but + // we should be within 10% of that. + int expected_per_partition = kNumRowsToPartition / part->NumPartitions(); + int fuzziness = expected_per_partition / 10; + for (int i = 0; i < part->NumPartitions(); i++) { + ASSERT_NEAR(counts_by_partition[i], expected_per_partition, fuzziness); + } + + // Drop the first and third range partition. + unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName)); + alterer->DropRangePartition(schema_.NewRow(), new KuduPartialRow(*split_rows[0])); + alterer->DropRangePartition(new KuduPartialRow(*split_rows[1]), schema_.NewRow()); + ASSERT_OK(alterer->Alter()); + + // The existing partitioner should still return results based on the table + // state at the time it was created, and successfully return partitions + // for rows in the now-dropped range. + ASSERT_EQ(num_ranges * kNumHashPartitions, part->NumPartitions()); + ASSERT_OK(row->SetInt32(0, 1000)); + int part_index; + ASSERT_OK(part->PartitionRow(*row, &part_index)); + ASSERT_GE(part_index, 0); + + // If we recreate the partitioner, it should get the new partitioning info. + { + KuduPartitioner* part_raw; + ASSERT_OK(KuduPartitionerBuilder(table) + .Build(&part_raw)); + part.reset(part_raw); + } + num_ranges = 1; + ASSERT_EQ(num_ranges * kNumHashPartitions, part->NumPartitions()); + + // ... and it should return -1 for non-covered ranges. + ASSERT_OK(row->SetInt32(0, 1000)); + ASSERT_OK(part->PartitionRow(*row, &part_index)); + ASSERT_EQ(-1, part_index); + ASSERT_OK(row->SetInt32(0, 8000)); + ASSERT_OK(part->PartitionRow(*row, &part_index)); + ASSERT_EQ(-1, part_index); +} + +TEST_F(ClientTest, TestInvalidPartitionerBuilder) { + KuduPartitioner* part; + Status s = KuduPartitionerBuilder(client_table_) + .SetBuildTimeout(MonoDelta()) + ->Build(&part); + ASSERT_EQ("Invalid argument: uninitialized timeout", s.ToString()); + + s = KuduPartitionerBuilder(sp::shared_ptr<KuduTable>()) + .Build(&part); + ASSERT_EQ("Invalid argument: null table", s.ToString()); +} + } // namespace client } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/client.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index b356227..ff03547 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -35,6 +35,7 @@ #include "kudu/client/error-internal.h" #include "kudu/client/error_collector.h" #include "kudu/client/meta_cache.h" +#include "kudu/client/partitioner-internal.h" #include "kudu/client/replica-internal.h" #include "kudu/client/row_result.h" #include "kudu/client/scan_predicate-internal.h" @@ -1610,5 +1611,44 @@ uint16_t KuduTabletServer::port() const { return data_->hp_.port(); } +//////////////////////////////////////////////////////////// +// KuduPartitionerBuilder +//////////////////////////////////////////////////////////// +KuduPartitionerBuilder::KuduPartitionerBuilder(sp::shared_ptr<KuduTable> table) + : data_(new Data(std::move(table))) { +} + +KuduPartitionerBuilder::~KuduPartitionerBuilder() { + delete data_; +} + +KuduPartitionerBuilder* KuduPartitionerBuilder::SetBuildTimeout(MonoDelta timeout) { + data_->SetBuildTimeout(timeout); + return this; +} + +Status KuduPartitionerBuilder::Build(KuduPartitioner** partitioner) { + return data_->Build(partitioner); +} + +//////////////////////////////////////////////////////////// +// KuduPartitioner +//////////////////////////////////////////////////////////// +KuduPartitioner::KuduPartitioner(Data* data) + : data_(CHECK_NOTNULL(data)) { +} + +KuduPartitioner::~KuduPartitioner() { + delete data_; +} + +int KuduPartitioner::NumPartitions() const { + return data_->num_partitions_; +} + +Status KuduPartitioner::PartitionRow(const KuduPartialRow& row, int* partition) { + return data_->PartitionRow(row, partition); +} + } // namespace client } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/client.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 06eb895..388ff9b 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -56,6 +56,7 @@ class PartitionSchema; namespace client { class KuduLoggingCallback; +class KuduPartitioner; class KuduScanToken; class KuduSession; class KuduStatusCallback; @@ -511,6 +512,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> { friend class internal::WriteRpc; friend class ClientTest; friend class KuduClientBuilder; + friend class KuduPartitionerBuilder; friend class KuduScanner; friend class KuduScanToken; friend class KuduScanTokenBuilder; @@ -977,6 +979,7 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> { class KUDU_NO_EXPORT Data; friend class KuduClient; + friend class KuduPartitioner; KuduTable(const sp::shared_ptr<KuduClient>& client, const std::string& name, @@ -2214,6 +2217,85 @@ class KUDU_EXPORT KuduScanTokenBuilder { DISALLOW_COPY_AND_ASSIGN(KuduScanTokenBuilder); }; +/// @brief Builder for Partitioner instances. +class KUDU_EXPORT KuduPartitionerBuilder { + public: + /// Construct an instance of the class. + /// + /// @param [in] table + /// The table whose rows should be partitioned. + explicit KuduPartitionerBuilder(sp::shared_ptr<KuduTable> table); + ~KuduPartitionerBuilder(); + + /// Set the timeout used for building the Partitioner object. + KuduPartitionerBuilder* SetBuildTimeout(MonoDelta timeout); + + /// Create a KuduPartitioner object for the specified table. + /// + /// This fetches all of the partitioning information up front if it + /// is not already cached by the associated KuduClient object. Thus, + /// it may time out or have an error if the Kudu master is not accessible. + /// + /// @param [out] partitioner + /// The resulting KuduPartitioner instance; caller gets ownership. + /// + /// @note If the KuduClient object associated with the table already has + /// some partition information cached (e.g. due to the construction of + /// other Partitioners, or due to normal read/write activity), the + /// resulting Partitioner will make use of that cached information. + /// This means that the resulting partitioner is not guaranteed to have + /// up-to-date partition information in the case that there has been + /// a recent change to the partitioning of the target table. + Status Build(KuduPartitioner** partitioner); + private: + class KUDU_NO_EXPORT Data; + + // Owned. + Data* data_; + + DISALLOW_COPY_AND_ASSIGN(KuduPartitionerBuilder); +}; + +/// A KuduPartitioner allows clients to determine the target partition of a +/// row without actually performing a write. The set of partitions is eagerly +/// fetched when the KuduPartitioner is constructed so that the actual partitioning +/// step can be performed synchronously without any network trips. +/// +/// @note Because this operates on a metadata snapshot retrieved at construction +/// time, it will not reflect any metadata changes to the table that have occurred +/// since its creation. +/// +/// @warning This class is not thread-safe. +class KUDU_EXPORT KuduPartitioner { + public: + ~KuduPartitioner(); + + /// Return the number of partitions known by this partitioner. + /// The partition indices returned by @c PartitionRow are guaranteed + /// to be less than this value. + int NumPartitions() const; + + /// Determine the partition index that the given row falls into. + /// + /// @param [in] row + /// The row to be partitioned. + /// @param [out] partition + /// The resulting partition index, or -1 if the row falls into a + /// non-covered range. The result will be less than @c NumPartitioons(). + /// + /// @return Status OK if successful. May return a bad Status if the + /// provided row does not have all columns of the partition key + /// set. + Status PartitionRow(const KuduPartialRow& row, int* partition); + private: + class KUDU_NO_EXPORT Data; + friend class KuduPartitionerBuilder; + + explicit KuduPartitioner(Data* data); + Data* data_; // Owned. +}; + + } // namespace client } // namespace kudu #endif http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/partitioner-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/partitioner-internal.cc b/src/kudu/client/partitioner-internal.cc new file mode 100644 index 0000000..c3b80c8 --- /dev/null +++ b/src/kudu/client/partitioner-internal.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/client/partitioner-internal.h" + +#include <glog/logging.h> +#include <map> +#include <memory> +#include <string> + +#include "kudu/client/client-internal.h" +#include "kudu/client/client.h" +#include "kudu/client/meta_cache.h" +#include "kudu/client/table-internal.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/util/status.h" + +using std::string; +using std::unique_ptr; + +namespace kudu { +namespace client { + +Status KuduPartitionerBuilder::Data::Build(KuduPartitioner** partitioner) { + // If any of the builder calls had generated a bad status, then return it here. + RETURN_NOT_OK(status_); + unique_ptr<KuduPartitioner::Data> ret_data(new KuduPartitioner::Data()); + + auto deadline = MonoTime::Now() + timeout_; + auto mc = table_->client()->data_->meta_cache_; + + // Insert a sentinel for the beginning of the table, in case they + // query for any row which falls before the first partition. + ret_data->partitions_by_start_key_[""] = -1; + string next_part_key = ""; + int i = 0; + while (true) { + scoped_refptr<internal::RemoteTablet> tablet; + Synchronizer sync; + mc->LookupTabletByKeyOrNext(table_.get(), next_part_key, deadline, + &tablet, sync.AsStatusCallback()); + Status s = sync.Wait(); + if (s.IsNotFound()) { + // No more tablets + break; + } + RETURN_NOT_OK(s); + const auto& start_key = tablet->partition().partition_key_start(); + const auto& end_key = tablet->partition().partition_key_end(); + ret_data->partitions_by_start_key_[start_key] = i++; + if (end_key.empty()) break; + ret_data->partitions_by_start_key_[end_key] = -1; + next_part_key = end_key; + } + ret_data->num_partitions_ = i; + ret_data->table_ = table_; + *partitioner = new KuduPartitioner(ret_data.release()); + return Status::OK(); +} + +Status KuduPartitioner::Data::PartitionRow( + const KuduPartialRow& row, int* partition) { + tmp_buf_.clear(); + RETURN_NOT_OK(table_->data_->partition_schema_.EncodeKey(row, &tmp_buf_)); + *partition = FindFloorOrDie(partitions_by_start_key_, tmp_buf_); + return Status::OK(); +} + +} // namespace client +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/partitioner-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/partitioner-internal.h b/src/kudu/client/partitioner-internal.h new file mode 100644 index 0000000..13f7291 --- /dev/null +++ b/src/kudu/client/partitioner-internal.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/client/client.h" +#include "kudu/common/partition.h" +#include "kudu/util/status.h" + +#include <map> +#include <string> + +namespace kudu { +namespace client { + +class KuduPartitionerBuilder::Data { + public: + explicit Data(sp::shared_ptr<KuduTable> table) + : table_(std::move(table)) { + if (table_) { + timeout_ = table_->client()->default_admin_operation_timeout(); + } else { + status_ = Status::InvalidArgument("null table"); + } + } + + ~Data() {} + + void SetBuildTimeout(MonoDelta timeout) { + if (!timeout.Initialized()) { + status_ = Status::InvalidArgument("uninitialized timeout"); + return; + } + timeout_ = timeout; + } + + Status Build(KuduPartitioner** partitioner); + + private: + const sp::shared_ptr<KuduTable> table_; + + // A deferred Status result, set to non-OK if one of the builder + // parameters is set to an invalid value. + Status status_; + MonoDelta timeout_; +}; + +class KuduPartitioner::Data { + public: + Status PartitionRow(const KuduPartialRow& row, int* partition); + + sp::shared_ptr<KuduTable> table_; + std::map<std::string, int> partitions_by_start_key_; + int num_partitions_ = 0; + std::string tmp_buf_; +}; + + +} // namespace client +} // namespace kudu
