KUDU-1648 - [python] Expose Setting of Range Partitions Currently, the Python client does not allow developers to set range partitions. This patch adds that capability and includes updates to existing tests.
Change-Id: Ib1e2c9a49196c6dd6644388d08014acd7593d4aa Reviewed-on: http://gerrit.cloudera.org:8080/4795 Tested-by: Kudu Jenkins Reviewed-by: Jean-Daniel Cryans <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0f87b044 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0f87b044 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0f87b044 Branch: refs/heads/master Commit: 0f87b044e53a0eae062a28764154141097d3871e Parents: 13ffec6 Author: Jordan Birdsell <[email protected]> Authored: Fri Oct 21 22:56:26 2016 -0400 Committer: Jean-Daniel Cryans <[email protected]> Committed: Mon Nov 7 20:02:29 2016 +0000 ---------------------------------------------------------------------- python/kudu/__init__.py | 4 +- python/kudu/client.pyx | 129 ++++++++++++++++++++++++++++++++-- python/kudu/libkudu_client.pxd | 9 +++ python/kudu/tests/test_client.py | 26 ++++++- python/kudu/tests/util.py | 17 ++++- 5 files changed, 174 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/__init__.py ---------------------------------------------------------------------- diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py index 828c3cf..2a6623c 100644 --- a/python/kudu/__init__.py +++ b/python/kudu/__init__.py @@ -26,7 +26,9 @@ from kudu.client import (Client, Table, Scanner, Session, # noqa FLUSH_AUTO_SYNC, FLUSH_MANUAL, READ_LATEST, - READ_AT_SNAPSHOT) + READ_AT_SNAPSHOT, + EXCLUSIVE_BOUND, + INCLUSIVE_BOUND) from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound, # noqa KuduNotSupported, http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index 501aaaa..0977b30 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -67,6 +67,33 @@ cdef dict _type_names = { KUDU_UNIXTIME_MICROS : "KUDU_UNIXTIME_MICROS" } +# Range Partition Bound Type enums +EXCLUSIVE_BOUND = PartitionType_Exclusive +INCLUSIVE_BOUND = PartitionType_Inclusive + +cdef dict _partition_bound_types = { + 'exclusive': PartitionType_Exclusive, + 'inclusive': PartitionType_Inclusive +} + +def _check_convert_range_bound_type(bound): + # Convert bounds types to constants and raise exception if invalid. + def invalid_bound_type(bound_type): + raise ValueError('Invalid range partition bound type: {0}' + .format(bound_type)) + + if isinstance(bound, int): + if bound >= len(_partition_bound_types) \ + or bound < 0: + invalid_bound_type(bound) + else: + return bound + else: + try: + return _partition_bound_types[bound.lower()] + except KeyError: + invalid_bound_type(bound) + cdef class TimeDelta: """ @@ -304,7 +331,7 @@ cdef class Client: try: c.table_name(tobytes(table_name)) c.schema(schema.schema) - self._apply_partitioning(c, partitioning) + self._apply_partitioning(c, partitioning, schema) if n_replicas: c.num_replicas(n_replicas) s = c.Create() @@ -312,10 +339,13 @@ cdef class Client: finally: del c - cdef _apply_partitioning(self, KuduTableCreator* c, part): + cdef _apply_partitioning(self, KuduTableCreator* c, part, Schema schema): cdef: vector[string] v - PartialRow py_row + PartialRow lower_bound + PartialRow upper_bound + PartialRow split_row + # Apply hash partitioning. for col_names, num_buckets, seed in part._hash_partitions: v.clear() @@ -331,6 +361,32 @@ cdef class Client: for n in part._range_partition_cols: v.push_back(tobytes(n)) c.set_range_partition_columns(v) + if part._range_partitions: + for partition in part._range_partitions: + if not isinstance(partition[0], PartialRow): + lower_bound = schema.new_row(partition[0]) + else: + lower_bound = partition[0] + lower_bound._own = 0 + if not isinstance(partition[1], PartialRow): + upper_bound = schema.new_row(partition[1]) + else: + upper_bound = partition[1] + upper_bound._own = 0 + c.add_range_partition( + lower_bound.row, + upper_bound.row, + _check_convert_range_bound_type(partition[2]), + _check_convert_range_bound_type(partition[3]) + ) + if part._range_partition_splits: + for split in part._range_partition_splits: + if not isinstance(split, PartialRow): + split_row = schema.new_row(split) + else: + split_row = split + split_row._own = 0 + c.add_range_partition_split(split_row.row) def delete_table(self, table_name): """ @@ -944,6 +1000,8 @@ class Partitioning(object): def __init__(self): self._hash_partitions = [] self._range_partition_cols = None + self._range_partitions = [] + self._range_partition_splits = [] def add_hash_partitions(self, column_names, num_buckets, seed=None): """ @@ -994,9 +1052,62 @@ class Partitioning(object): self._range_partition_cols = column_names return self - # TODO: implement split_rows. - # This is slightly tricky since currently the PartialRow constructor requires a - # Table object, which doesn't exist yet. Should we use tuples instead? + def add_range_partition(self, lower_bound=None, + upper_bound=None, + lower_bound_type='inclusive', + upper_bound_type='exclusive'): + """ + Add a range partition to the table. + + Multiple range partitions may be added, but they must not overlap. + All range splits specified by add_range_partition_split must fall + in a range partition. The lower bound must be less than or equal + to the upper bound. + + If this method is not called, the table's range will be unbounded. + + Parameters + ---------- + lower_bound : PartialRow/list/tuple/dict + upper_bound : PartialRow/list/tuple/dict + lower_bound_type : {'inclusive', 'exclusive'} or constants + kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND + upper_bound_type : {'inclusive', 'exclusive'} or constants + kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND + + Returns + ------- + self : Partitioning + """ + if self._range_partition_cols: + self._range_partitions.append( + (lower_bound, upper_bound, lower_bound_type, upper_bound_type) + ) + else: + raise ValueError("Range Partition Columns must be set before " + + "adding a range partition.") + + return self + + def add_range_partition_split(self, split_row): + """ + Add a range partition split at the provided row. + + Parameters + ---------- + split_row : PartialRow/list/tuple/dict + + Returns + ------- + self : Partitioning + """ + if self._range_partition_cols: + self._range_partition_splits.append(split_row) + else: + raise ValueError("Range Partition Columns must be set before " + + "adding a range partition split.") + + return self cdef class Predicate: @@ -2244,7 +2355,11 @@ cdef class PartialRow: if isinstance(key, basestring): self.set_field(key, value) else: - self.set_loc(key, value) + if 0 <= key < len(self.schema): + self.set_loc(key, value) + else: + raise IndexError("Column index {0} is out of bounds." + .format(key)) def from_record(self, record): """ http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/libkudu_client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index 643e349..9f2bf09 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -464,6 +464,10 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: ReadMode_Latest " kudu::client::KuduScanner::READ_LATEST" ReadMode_Snapshot " kudu::client::KuduScanner::READ_AT_SNAPSHOT" + enum RangePartitionBound" kudu::client::KuduTableCreator::RangePartitionBound": + PartitionType_Exclusive " kudu::client::KuduTableCreator::EXCLUSIVE_BOUND" + PartitionType_Inclusive " kudu::client::KuduTableCreator::INCLUSIVE_BOUND" + cdef cppclass KuduClient: Status DeleteTable(const string& table_name) @@ -518,6 +522,11 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: int num_buckets, int seed) KuduTableCreator& set_range_partition_columns(vector[string]& columns) + KuduTableCreator& add_range_partition(KuduPartialRow* lower_bound, + KuduPartialRow* upper_bound, + RangePartitionBound lower_bound_type, + RangePartitionBound upper_bound_type) + KuduTableCreator& add_range_partition_split(KuduPartialRow* split_row) KuduTableCreator& split_rows(vector[const KuduPartialRow*]& split_rows) KuduTableCreator& num_replicas(int n_replicas) KuduTableCreator& wait(c_bool wait) http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/tests/test_client.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index 0c97ee9..10bbda4 100644 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -132,7 +132,12 @@ class TestClient(KuduTestBase, unittest.TestCase): self.client.create_table( name, self.schema, - partitioning=Partitioning().set_range_partition_columns([])) + partitioning=Partitioning() + .set_range_partition_columns(['key']) + .add_range_partition_split({'key': 10}) + .add_range_partition_split([20]) + .add_range_partition_split((30,)) + ) self.client.delete_table(name) self.client.create_table( @@ -246,6 +251,25 @@ class TestClient(KuduTestBase, unittest.TestCase): assert tserver.hostname() is not None assert tserver.port() is not None + def test_bad_partialrow(self): + table = self.client.table(self.ex_table) + op = table.new_insert() + # Test bad keys or indexes + keys = [ + ('not-there', KeyError), + (len(self.schema) + 1, IndexError), + (-1, IndexError) + ] + + for key in keys: + with self.assertRaises(key[1]): + op[key[0]] = 'test' + + # Test incorrectly typed data + with self.assertRaises(TypeError): + op['int_val'] = 'incorrect' + + class TestMonoDelta(unittest.TestCase): def test_empty_ctor(self): http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/tests/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py index a6bbae6..2783111 100644 --- a/python/kudu/tests/util.py +++ b/python/kudu/tests/util.py @@ -65,7 +65,7 @@ class TestScanBase(KuduTestBase, unittest.TestCase): table_name = 'type-test' # Create schema, partitioning and then table builder = kudu.schema_builder() - builder.add_column('key').type(kudu.int64).nullable(False).primary_key() + builder.add_column('key').type(kudu.int64).nullable(False) builder.add_column('unixtime_micros_val', type_=kudu.unixtime_micros, nullable=False) builder.add_column('string_val', type_=kudu.string, compression=kudu.COMPRESSION_LZ4, encoding='prefix') builder.add_column('bool_val', type_=kudu.bool) @@ -73,13 +73,26 @@ class TestScanBase(KuduTestBase, unittest.TestCase): builder.add_column('int8_val', type_=kudu.int8) builder.add_column('binary_val', type_='binary', compression=kudu.COMPRESSION_SNAPPY, encoding='prefix') builder.add_column('float_val', type_=kudu.float) + builder.set_primary_keys(['key', 'unixtime_micros_val']) schema = builder.build() self.projected_names_w_o_float = [ col for col in schema.names if col != 'float_val' ] - partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) + partitioning = Partitioning() \ + .add_hash_partitions(column_names=['key'], num_buckets=3)\ + .set_range_partition_columns(['unixtime_micros_val'])\ + .add_range_partition( + upper_bound={'unixtime_micros_val': ("2016-01-01", "%Y-%m-%d")}, + upper_bound_type=kudu.EXCLUSIVE_BOUND + )\ + .add_range_partition( + lower_bound={'unixtime_micros_val': datetime.datetime(2016, 1, 1)}, + lower_bound_type='INCLUSIVE', + upper_bound={'unixtime_micros_val': datetime.datetime(9999, 12, 31)} + ) + self.client.create_table(table_name, schema, partitioning) self.type_table = self.client.table(table_name)
