Repository: kudu
Updated Branches:
  refs/heads/master d5ac00c79 -> ee8592a1f


KUDU-1713: add a client Partitioner API

This adds an API which a client can use to determine which partition a
row falls into without actually writing that row. This is intended to be
used by Impala when writing into Kudu: Impala can pre-shuffle and
pre-sort the data being written so as to present a more efficient write
pattern to the Kudu tablets.

The corresponding Impala patch to use this API is at:
https://gerrit.cloudera.org/#/c/6559/

Change-Id: Ic08a078c75f15ef4200219b5260cfb99d79b72cc
Reviewed-on: http://gerrit.cloudera.org:8080/5775
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c47730a3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c47730a3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c47730a3

Branch: refs/heads/master
Commit: c47730a377aef550f2d7a8ee1ded4bd5a2d40245
Parents: d5ac00c
Author: Todd Lipcon <[email protected]>
Authored: Mon Jan 23 18:32:41 2017 -0800
Committer: Todd Lipcon <[email protected]>
Committed: Tue Apr 11 00:40:03 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/CMakeLists.txt          |   1 +
 src/kudu/client/client-test.cc          | 110 +++++++++++++++++++++++++++
 src/kudu/client/client.cc               |  40 ++++++++++
 src/kudu/client/client.h                |  82 ++++++++++++++++++++
 src/kudu/client/partitioner-internal.cc |  85 +++++++++++++++++++++
 src/kudu/client/partitioner-internal.h  |  73 ++++++++++++++++++
 6 files changed, 391 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index 5d38a81..cdc73d9 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -37,6 +37,7 @@ set(CLIENT_SRCS
   error-internal.cc
   master_rpc.cc
   meta_cache.cc
+  partitioner-internal.cc
   scan_batch.cc
   scan_configuration.cc
   scan_predicate.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7c2a03f..2159090 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4870,5 +4870,115 @@ INSTANTIATE_TEST_CASE_P(
     , ServiceUnavailableRetryClientTest,
     ::testing::ValuesIn(service_unavailable_retry_cases));
 
+TEST_F(ClientTest, TestPartitioner) {
+  // Create a table with the following 9 partitions:
+  //
+  //             hash bucket
+  //   key     0      1     2
+  //         -----------------
+  //  <3000    x      x     x
+  // 3000-7000 x      x     x
+  //  >=7000   x      x     x
+  int num_ranges = 3;
+  const int kNumHashPartitions = 3;
+  const char* kTableName = "TestPartitioner";
+
+  vector<unique_ptr<KuduPartialRow>> split_rows;
+  for (int32_t split : {3333, 6666}) {
+    unique_ptr<KuduPartialRow> row(schema_.NewRow());
+    ASSERT_OK(row->SetInt32("key", split));
+    split_rows.emplace_back(std::move(row));
+  }
+
+  shared_ptr<KuduTable> table;
+  gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  table_creator->table_name(kTableName)
+      .schema(&schema_)
+      .num_replicas(1)
+      .add_hash_partitions({ "key" }, kNumHashPartitions)
+      .set_range_partition_columns({ "key" });
+
+  for (const auto& row : split_rows) {
+    table_creator->add_range_partition_split(new KuduPartialRow(*row));
+  }
+  ASSERT_OK(table_creator->Create());
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+
+  // Build a partitioner on the table.
+  unique_ptr<KuduPartitioner> part;
+  {
+    KuduPartitioner* part_raw;
+    ASSERT_OK(KuduPartitionerBuilder(table)
+              .Build(&part_raw));
+    part.reset(part_raw);
+  }
+
+  ASSERT_EQ(num_ranges * kNumHashPartitions, part->NumPartitions());
+
+  // Partition a bunch of rows, counting how many fall into each partition.
+  unique_ptr<KuduPartialRow> row(table->schema().NewRow());
+  vector<int> counts_by_partition(part->NumPartitions());
+  const int kNumRowsToPartition = 10000;
+  for (int i = 0; i < kNumRowsToPartition; i++) {
+    ASSERT_OK(row->SetInt32(0, i));
+    int part_index;
+    ASSERT_OK(part->PartitionRow(*row, &part_index));
+    counts_by_partition.at(part_index)++;
+  }
+
+  // We don't expect a completely even division of rows into partitions, but
+  // we should be within 10% of that.
+  int expected_per_partition = kNumRowsToPartition / part->NumPartitions();
+  int fuzziness = expected_per_partition / 10;
+  for (int i = 0; i < part->NumPartitions(); i++) {
+    ASSERT_NEAR(counts_by_partition[i], expected_per_partition, fuzziness);
+  }
+
+  // Drop the first and third range partition.
+  unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(kTableName));
+  alterer->DropRangePartition(schema_.NewRow(), new 
KuduPartialRow(*split_rows[0]));
+  alterer->DropRangePartition(new KuduPartialRow(*split_rows[1]), 
schema_.NewRow());
+  ASSERT_OK(alterer->Alter());
+
+  // The existing partitioner should still return results based on the table
+  // state at the time it was created, and successfully return partitions
+  // for rows in the now-dropped range.
+  ASSERT_EQ(num_ranges * kNumHashPartitions, part->NumPartitions());
+  ASSERT_OK(row->SetInt32(0, 1000));
+  int part_index;
+  ASSERT_OK(part->PartitionRow(*row, &part_index));
+  ASSERT_GE(part_index, 0);
+
+  // If we recreate the partitioner, it should get the new partitioning info.
+  {
+    KuduPartitioner* part_raw;
+    ASSERT_OK(KuduPartitionerBuilder(table)
+              .Build(&part_raw));
+    part.reset(part_raw);
+  }
+  num_ranges = 1;
+  ASSERT_EQ(num_ranges * kNumHashPartitions, part->NumPartitions());
+
+  // ... and it should return -1 for non-covered ranges.
+  ASSERT_OK(row->SetInt32(0, 1000));
+  ASSERT_OK(part->PartitionRow(*row, &part_index));
+  ASSERT_EQ(-1, part_index);
+  ASSERT_OK(row->SetInt32(0, 8000));
+  ASSERT_OK(part->PartitionRow(*row, &part_index));
+  ASSERT_EQ(-1, part_index);
+}
+
+TEST_F(ClientTest, TestInvalidPartitionerBuilder) {
+  KuduPartitioner* part;
+  Status s = KuduPartitionerBuilder(client_table_)
+      .SetBuildTimeout(MonoDelta())
+      ->Build(&part);
+  ASSERT_EQ("Invalid argument: uninitialized timeout", s.ToString());
+
+  s = KuduPartitionerBuilder(sp::shared_ptr<KuduTable>())
+      .Build(&part);
+  ASSERT_EQ("Invalid argument: null table", s.ToString());
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index b356227..ff03547 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -35,6 +35,7 @@
 #include "kudu/client/error-internal.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/meta_cache.h"
+#include "kudu/client/partitioner-internal.h"
 #include "kudu/client/replica-internal.h"
 #include "kudu/client/row_result.h"
 #include "kudu/client/scan_predicate-internal.h"
@@ -1610,5 +1611,44 @@ uint16_t KuduTabletServer::port() const {
   return data_->hp_.port();
 }
 
+////////////////////////////////////////////////////////////
+// KuduPartitionerBuilder
+////////////////////////////////////////////////////////////
+KuduPartitionerBuilder::KuduPartitionerBuilder(sp::shared_ptr<KuduTable> table)
+    : data_(new Data(std::move(table))) {
+}
+
+KuduPartitionerBuilder::~KuduPartitionerBuilder() {
+  delete data_;
+}
+
+KuduPartitionerBuilder* KuduPartitionerBuilder::SetBuildTimeout(MonoDelta 
timeout) {
+  data_->SetBuildTimeout(timeout);
+  return this;
+}
+
+Status KuduPartitionerBuilder::Build(KuduPartitioner** partitioner) {
+  return data_->Build(partitioner);
+}
+
+////////////////////////////////////////////////////////////
+// KuduPartitioner
+////////////////////////////////////////////////////////////
+KuduPartitioner::KuduPartitioner(Data* data)
+    : data_(CHECK_NOTNULL(data)) {
+}
+
+KuduPartitioner::~KuduPartitioner() {
+  delete data_;
+}
+
+int KuduPartitioner::NumPartitions() const {
+  return data_->num_partitions_;
+}
+
+Status KuduPartitioner::PartitionRow(const KuduPartialRow& row, int* 
partition) {
+  return data_->PartitionRow(row, partition);
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 06eb895..388ff9b 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -56,6 +56,7 @@ class PartitionSchema;
 namespace client {
 
 class KuduLoggingCallback;
+class KuduPartitioner;
 class KuduScanToken;
 class KuduSession;
 class KuduStatusCallback;
@@ -511,6 +512,7 @@ class KUDU_EXPORT KuduClient : public 
sp::enable_shared_from_this<KuduClient> {
   friend class internal::WriteRpc;
   friend class ClientTest;
   friend class KuduClientBuilder;
+  friend class KuduPartitionerBuilder;
   friend class KuduScanner;
   friend class KuduScanToken;
   friend class KuduScanTokenBuilder;
@@ -977,6 +979,7 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
   class KUDU_NO_EXPORT Data;
 
   friend class KuduClient;
+  friend class KuduPartitioner;
 
   KuduTable(const sp::shared_ptr<KuduClient>& client,
             const std::string& name,
@@ -2214,6 +2217,85 @@ class KUDU_EXPORT KuduScanTokenBuilder {
   DISALLOW_COPY_AND_ASSIGN(KuduScanTokenBuilder);
 };
 
+/// @brief Builder for Partitioner instances.
+class KUDU_EXPORT KuduPartitionerBuilder {
+ public:
+  /// Construct an instance of the class.
+  ///
+  /// @param [in] table
+  ///   The table whose rows should be partitioned.
+  explicit KuduPartitionerBuilder(sp::shared_ptr<KuduTable> table);
+  ~KuduPartitionerBuilder();
+
+  /// Set the timeout used for building the Partitioner object.
+  KuduPartitionerBuilder* SetBuildTimeout(MonoDelta timeout);
+
+  /// Create a KuduPartitioner object for the specified table.
+  ///
+  /// This fetches all of the partitioning information up front if it
+  /// is not already cached by the associated KuduClient object. Thus,
+  /// it may time out or have an error if the Kudu master is not accessible.
+  ///
+  /// @param [out] partitioner
+  ///   The resulting KuduPartitioner instance; caller gets ownership.
+  ///
+  /// @note If the KuduClient object associated with the table already has
+  /// some partition information cached (e.g. due to the construction of
+  /// other Partitioners, or due to normal read/write activity), the
+  /// resulting Partitioner will make use of that cached information.
+  /// This means that the resulting partitioner is not guaranteed to have
+  /// up-to-date partition information in the case that there has been
+  /// a recent change to the partitioning of the target table.
+  Status Build(KuduPartitioner** partitioner);
+ private:
+  class KUDU_NO_EXPORT Data;
+
+  // Owned.
+  Data* data_;
+
+  DISALLOW_COPY_AND_ASSIGN(KuduPartitionerBuilder);
+};
+
+/// A KuduPartitioner allows clients to determine the target partition of a
+/// row without actually performing a write. The set of partitions is eagerly
+/// fetched when the KuduPartitioner is constructed so that the actual 
partitioning
+/// step can be performed synchronously without any network trips.
+///
+/// @note Because this operates on a metadata snapshot retrieved at 
construction
+/// time, it will not reflect any metadata changes to the table that have 
occurred
+/// since its creation.
+///
+/// @warning This class is not thread-safe.
+class KUDU_EXPORT KuduPartitioner {
+ public:
+  ~KuduPartitioner();
+
+  /// Return the number of partitions known by this partitioner.
+  /// The partition indices returned by @c PartitionRow are guaranteed
+  /// to be less than this value.
+  int NumPartitions() const;
+
+  /// Determine the partition index that the given row falls into.
+  ///
+  /// @param [in] row
+  ///   The row to be partitioned.
+  /// @param [out] partition
+  ///   The resulting partition index, or -1 if the row falls into a
+  ///   non-covered range. The result will be less than @c NumPartitioons().
+  ///
+  /// @return Status OK if successful. May return a bad Status if the
+  /// provided row does not have all columns of the partition key
+  /// set.
+  Status PartitionRow(const KuduPartialRow& row, int* partition);
+ private:
+  class KUDU_NO_EXPORT Data;
+  friend class KuduPartitionerBuilder;
+
+  explicit KuduPartitioner(Data* data);
+  Data* data_; // Owned.
+};
+
+
 } // namespace client
 } // namespace kudu
 #endif

http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/partitioner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/partitioner-internal.cc 
b/src/kudu/client/partitioner-internal.cc
new file mode 100644
index 0000000..c3b80c8
--- /dev/null
+++ b/src/kudu/client/partitioner-internal.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/partitioner-internal.h"
+
+#include <glog/logging.h>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "kudu/client/client-internal.h"
+#include "kudu/client/client.h"
+#include "kudu/client/meta_cache.h"
+#include "kudu/client/table-internal.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/status.h"
+
+using std::string;
+using std::unique_ptr;
+
+namespace kudu {
+namespace client {
+
+Status KuduPartitionerBuilder::Data::Build(KuduPartitioner** partitioner) {
+  // If any of the builder calls had generated a bad status, then return it 
here.
+  RETURN_NOT_OK(status_);
+  unique_ptr<KuduPartitioner::Data> ret_data(new KuduPartitioner::Data());
+
+  auto deadline = MonoTime::Now() + timeout_;
+  auto mc = table_->client()->data_->meta_cache_;
+
+  // Insert a sentinel for the beginning of the table, in case they
+  // query for any row which falls before the first partition.
+  ret_data->partitions_by_start_key_[""] =  -1;
+  string next_part_key = "";
+  int i = 0;
+  while (true) {
+    scoped_refptr<internal::RemoteTablet> tablet;
+    Synchronizer sync;
+    mc->LookupTabletByKeyOrNext(table_.get(), next_part_key, deadline,
+                                &tablet, sync.AsStatusCallback());
+    Status s = sync.Wait();
+    if (s.IsNotFound()) {
+      // No more tablets
+      break;
+    }
+    RETURN_NOT_OK(s);
+    const auto& start_key = tablet->partition().partition_key_start();
+    const auto& end_key = tablet->partition().partition_key_end();
+    ret_data->partitions_by_start_key_[start_key] = i++;
+    if (end_key.empty()) break;
+    ret_data->partitions_by_start_key_[end_key] = -1;
+    next_part_key = end_key;
+  }
+  ret_data->num_partitions_ = i;
+  ret_data->table_ = table_;
+  *partitioner = new KuduPartitioner(ret_data.release());
+  return Status::OK();
+}
+
+Status KuduPartitioner::Data::PartitionRow(
+    const KuduPartialRow& row, int* partition) {
+  tmp_buf_.clear();
+  RETURN_NOT_OK(table_->data_->partition_schema_.EncodeKey(row, &tmp_buf_));
+  *partition = FindFloorOrDie(partitions_by_start_key_, tmp_buf_);
+  return Status::OK();
+}
+
+} // namespace client
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/c47730a3/src/kudu/client/partitioner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/partitioner-internal.h 
b/src/kudu/client/partitioner-internal.h
new file mode 100644
index 0000000..13f7291
--- /dev/null
+++ b/src/kudu/client/partitioner-internal.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/client/client.h"
+#include "kudu/common/partition.h"
+#include "kudu/util/status.h"
+
+#include <map>
+#include <string>
+
+namespace kudu {
+namespace client {
+
+class KuduPartitionerBuilder::Data {
+ public:
+  explicit Data(sp::shared_ptr<KuduTable> table)
+      : table_(std::move(table)) {
+    if (table_) {
+      timeout_ = table_->client()->default_admin_operation_timeout();
+    } else {
+      status_ = Status::InvalidArgument("null table");
+    }
+  }
+
+  ~Data() {}
+
+  void SetBuildTimeout(MonoDelta timeout) {
+    if (!timeout.Initialized()) {
+      status_ = Status::InvalidArgument("uninitialized timeout");
+      return;
+    }
+    timeout_ = timeout;
+  }
+
+  Status Build(KuduPartitioner** partitioner);
+
+ private:
+  const sp::shared_ptr<KuduTable> table_;
+
+  // A deferred Status result, set to non-OK if one of the builder
+  // parameters is set to an invalid value.
+  Status status_;
+  MonoDelta timeout_;
+};
+
+class KuduPartitioner::Data {
+ public:
+  Status PartitionRow(const KuduPartialRow& row, int* partition);
+
+  sp::shared_ptr<KuduTable> table_;
+  std::map<std::string, int> partitions_by_start_key_;
+  int num_partitions_ = 0;
+  std::string tmp_buf_;
+};
+
+
+} // namespace client
+} // namespace kudu

Reply via email to