KUDU-1638 - [python] Add Alter Table Support The Python client currently doesn't support the ability to alter table. This patch adds this capability and includes tests.
Change-Id: If72c76e1ea7c80452b401c55b19df4fbac7dd2d7 Reviewed-on: http://gerrit.cloudera.org:8080/4823 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fec9b887 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fec9b887 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fec9b887 Branch: refs/heads/master Commit: fec9b8879608c54a6b61d404d65bbd8798b917e7 Parents: e1433ff Author: Jordan Birdsell <[email protected]> Authored: Mon Oct 24 08:19:38 2016 -0400 Committer: Todd Lipcon <[email protected]> Committed: Fri Nov 11 05:02:23 2016 +0000 ---------------------------------------------------------------------- python/kudu/client.pxd | 41 +++++ python/kudu/client.pyx | 280 +++++++++++++++++++++++++++++++--- python/kudu/libkudu_client.pxd | 32 ++-- python/kudu/schema.pyx | 7 +- python/kudu/tests/test_client.py | 65 ++++++++ python/kudu/tests/test_schema.py | 10 ++ 6 files changed, 393 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/fec9b887/python/kudu/client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/client.pxd b/python/kudu/client.pxd index 375da00..f28a0ce 100644 --- a/python/kudu/client.pxd +++ b/python/kudu/client.pxd @@ -20,6 +20,20 @@ from libkudu_client cimport * from kudu.schema cimport Schema +cdef class Client: + + cdef: + shared_ptr[KuduClient] client + KuduClient* cp + + cdef readonly: + list master_addrs + + cpdef close(self) + + cdef _apply_partitioning(self, KuduTableCreator* c, part, Schema schema) + + cdef class Session: cdef: shared_ptr[KuduSession] s @@ -40,3 +54,30 @@ cdef class PartialRow: cpdef set_loc_null(self, int i) cdef add_to_session(self, Session s) + + +cdef class Table: + + cdef: + shared_ptr[KuduTable] table + + cdef readonly: + object _name + Schema schema + Client parent + int num_replicas + + cdef init(self) + + cdef inline KuduTable* ptr(self): + return self.table.get() + + +cdef class TableAlterer: + + cdef: + KuduTableAlterer* _alterer + Table _table + object _new_name + + cdef _init(self, KuduTableAlterer* alterer) http://git-wip-us.apache.org/repos/asf/kudu/blob/fec9b887/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index 328c127..f5a87f8 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -27,7 +27,7 @@ from cython.operator cimport dereference as deref from libkudu_client cimport * from kudu.compat import tobytes, frombytes, dict_iter -from kudu.schema cimport Schema, ColumnSchema, KuduValue, KuduType +from kudu.schema cimport Schema, ColumnSchema, ColumnSpec, KuduValue, KuduType from kudu.errors cimport check_status from kudu.util import to_unixtime_micros, from_unixtime_micros, from_hybridtime from errors import KuduException @@ -236,13 +236,6 @@ cdef class Client: kudu.connect instead. """ - cdef: - shared_ptr[KuduClient] client - KuduClient* cp - - cdef readonly: - list master_addrs - def __cinit__(self, addr_or_addrs, admin_timeout_ms=None, rpc_timeout_ms=None): cdef: @@ -530,6 +523,32 @@ cdef class Client: return result + def new_table_alterer(self, Table table): + """ + Create a TableAlterer object that can be used to apply a set of steps + to alter a table. + + Parameters + ---------- + table : Table + Table to alter. NOTE: The TableAlterer.alter() method will return + a new Table object with the updated information. + + Examples + -------- + table = client.table('example') + alterer = client.new_table_alterer(table) + table = alterer.rename('example2').alter() + + Returns + ------- + alterer : TableAlterer + """ + cdef: + TableAlterer alterer = TableAlterer(table) + + alterer._init(self.cp.NewTableAlterer(tobytes(table.name))) + return alterer #---------------------------------------------------------------------- @@ -687,15 +706,6 @@ cdef class Table: using the kudu.Client.table method after connecting to a cluster. """ - cdef: - shared_ptr[KuduTable] table - - cdef readonly: - object _name - Schema schema - Client parent - int num_replicas - def __cinit__(self, name, Client client): self._name = name self.parent = client @@ -854,9 +864,6 @@ cdef class Table: """ return ScanTokenBuilder(self) - cdef inline KuduTable* ptr(self): - return self.table.get() - cdef class Column: @@ -2546,3 +2553,236 @@ cdef inline cast_pyvalue(DataType t, object o): return StringVal(o) else: raise TypeError("Cannot cast kudu type <{0}>".format(_type_names[t])) + + +cdef class TableAlterer: + """ + Alters an existing table based on the provided steps. + """ + + def __cinit__(self, Table table): + self._table = table + self._new_name = None + + def __dealloc__(self): + if self._alterer != NULL: + del self._alterer + + cdef _init(self, KuduTableAlterer* alterer): + self._alterer = alterer + + def rename(self, table_name): + """ + Rename the table. Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + table_name : str + The new name for the table. + + Return + ------ + self : TableAlterer + """ + self._alterer.RenameTo(tobytes(table_name)) + self._new_name = table_name + return self + + def add_column(self, name, type_=None, nullable=None, compression=None, + encoding=None, default=None): + """ + Add a new column to the table. + + When adding a column, you must specify the default value of the new + column using ColumnSpec.default(...) or the default parameter in this + method. + + Parameters + ---------- + name : string + type_ : string or KuduType + Data type e.g. 'int32' or kudu.int32 + nullable : boolean, default None + New columns are nullable by default. Set boolean value for explicit + nullable / not-nullable + compression : string or int + One of kudu.COMPRESSION_* constants or their string equivalent. + encoding : string or int + One of kudu.ENCODING_* constants or their string equivalent. + default : obj + Use this to set the column default value + + Returns + ------- + spec : ColumnSpec + """ + cdef: + ColumnSpec result = ColumnSpec() + + result.spec = self._alterer.AddColumn(tobytes(name)) + + if type_ is not None: + result.type(type_) + + if nullable is not None: + result.nullable(nullable) + + if compression is not None: + result.compression(compression) + + if encoding is not None: + result.encoding(encoding) + + if default: + result.default(default) + + return result + + def alter_column(self, name, rename_to=None): + """ + Alter an existing column. + + Parameters + ---------- + name : string + rename_to : str + If set, the column will be renamed to this + + Returns + ------- + spec : ColumnSpec + """ + cdef: + ColumnSpec result = ColumnSpec() + + result.spec = self._alterer.AlterColumn(tobytes(name)) + + if rename_to: + result.rename(tobytes(rename_to)) + + return result + + def drop_column(self, name): + """ + Drops an existing column from the table. + + Parameters + ---------- + name : str + The name of the column to drop. + + Returns + ------- + self : TableAlterer + """ + self._alterer.DropColumn(tobytes(name)) + return self + + def add_range_partition(self, lower_bound=None, + upper_bound=None, + lower_bound_type='inclusive', + upper_bound_type='exclusive'): + """ + Add a range partition to the table with the specified lower bound and + upper bound. + + Multiple range partitions may be added as part of a single alter table + transaction by calling this method multiple times on the table alterer. + + This client may immediately write and scan the new tablets when Alter() + returns success, however other existing clients may have to wait for a + timeout period to elapse before the tablets become visible. This period + is configured by the master's 'table_locations_ttl_ms' flag, and + defaults to one hour. + + Parameters + ---------- + lower_bound : PartialRow/list/tuple/dict + upper_bound : PartialRow/list/tuple/dict + lower_bound_type : {'inclusive', 'exclusive'} or constants + kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND + upper_bound_type : {'inclusive', 'exclusive'} or constants + kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND + + Returns + ------- + self : TableAlterer + """ + cdef: + PartialRow lbound + PartialRow ubound + + if not isinstance(lower_bound, PartialRow): + lbound = self._table.schema.new_row(lower_bound) + else: + lbound = lower_bound + lbound._own = 0 + if not isinstance(upper_bound, PartialRow): + ubound = self._table.schema.new_row(upper_bound) + else: + ubound = upper_bound + ubound._own = 0 + self._alterer.AddRangePartition( + lbound.row, + ubound.row, + _check_convert_range_bound_type(lower_bound_type), + _check_convert_range_bound_type(upper_bound_type) + ) + + def drop_range_partition(self, lower_bound=None, + upper_bound=None, + lower_bound_type='inclusive', + upper_bound_type='exclusive'): + """ + Drop the range partition from the table with the specified lower bound + and upper bound. The bounds must match an existing range partition + exactly, and may not span multiple range partitions. + + Multiple range partitions may be dropped as part of a single alter + table transaction by calling this method multiple times on the + table alterer. + + Parameters + ---------- + lower_bound : PartialRow/list/tuple/dict + upper_bound : PartialRow/list/tuple/dict + lower_bound_type : {'inclusive', 'exclusive'} or constants + kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND + upper_bound_type : {'inclusive', 'exclusive'} or constants + kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND + + Returns + ------- + self : TableAlterer + """ + cdef: + PartialRow lbound + PartialRow ubound + + if not isinstance(lower_bound, PartialRow): + lbound = self._table.schema.new_row(lower_bound) + else: + lbound = lower_bound + lbound._own = 0 + if not isinstance(upper_bound, PartialRow): + ubound = self._table.schema.new_row(upper_bound) + else: + ubound = upper_bound + ubound._own = 0 + self._alterer.DropRangePartition( + lbound.row, + ubound.row, + _check_convert_range_bound_type(lower_bound_type), + _check_convert_range_bound_type(upper_bound_type) + ) + + def alter(self): + """ + Alter table. Returns a new table object upon completion of the alter. + + Returns + ------- + table :Table + """ + check_status(self._alterer.Alter()) + return self._table.parent.table(self._new_name or self._table.name) http://git-wip-us.apache.org/repos/asf/kudu/blob/fec9b887/python/kudu/libkudu_client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index b8cbb60..ef887fd 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -182,7 +182,7 @@ cdef extern from "kudu/client/schema.h" namespace "kudu::client" nogil: KuduColumnSpec* Nullable() KuduColumnSpec* Type(DataType type_) - KuduColumnSpec* RenameTo(string& new_name) + KuduColumnSpec* RenameTo(const string& new_name) cdef cppclass KuduSchemaBuilder: @@ -488,7 +488,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: Status TableExists(const string& table_name, c_bool* exists) - KuduTableAlterer* NewTableAlterer() + KuduTableAlterer* NewTableAlterer(const string& table_name) Status IsAlterTableInProgress(const string& table_name, c_bool* alter_in_progress) uint64_t GetLatestObservedTimestamp() @@ -534,22 +534,18 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: Status Create() cdef cppclass KuduTableAlterer: - # The name of the existing table to alter - KuduTableAlterer& table_name(string& name) - - KuduTableAlterer& rename_table(string& name) - - KuduTableAlterer& add_column(string& name, DataType type, - const void *default_value) - KuduTableAlterer& add_column(string& name, DataType type, - const void *default_value, - KuduColumnStorageAttributes attr) - - KuduTableAlterer& add_nullable_column(string& name, DataType type) - - KuduTableAlterer& drop_column(string& name) - - KuduTableAlterer& rename_column(string& old_name, string& new_name) + KuduTableAlterer& RenameTo(const string& new_name) + KuduColumnSpec* AddColumn(const string& name) + KuduColumnSpec* AlterColumn(const string& name) + KuduTableAlterer& DropColumn(const string& name) + KuduTableAlterer& AddRangePartition(KuduPartialRow* lower_bound, + KuduPartialRow* upper_bound, + RangePartitionBound lower_bound_type, + RangePartitionBound upper_bound_type) + KuduTableAlterer& DropRangePartition(KuduPartialRow* lower_bound, + KuduPartialRow* upper_bound, + RangePartitionBound lower_bound_type, + RangePartitionBound upper_bound_type) KuduTableAlterer& wait(c_bool wait) http://git-wip-us.apache.org/repos/asf/kudu/blob/fec9b887/python/kudu/schema.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/schema.pyx b/python/kudu/schema.pyx index f24d42b..f4fa218 100644 --- a/python/kudu/schema.pyx +++ b/python/kudu/schema.pyx @@ -227,11 +227,12 @@ cdef class ColumnSpec: self.spec.Default(kval._value) return self - def clear_default(self): + def remove_default(self): """ Remove a default value set. """ - raise NotImplementedError + self.spec.RemoveDefault() + return self def compression(self, compression): """ @@ -326,8 +327,6 @@ cdef class ColumnSpec: def rename(self, new_name): """ Change the column name. - - TODO: Not implemented for table creation """ self.spec.RenameTo(new_name) return self http://git-wip-us.apache.org/repos/asf/kudu/blob/fec9b887/python/kudu/tests/test_client.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index 4896cb2..f9a408c 100644 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -294,6 +294,71 @@ class TestClient(KuduTestBase, unittest.TestCase): with self.assertRaises(TypeError): op['int_val'] = 'incorrect' + def test_alter_table_rename(self): + try: + self.client.create_table('alter-rename', + self.schema, + self.partitioning) + table = self.client.table('alter-rename') + alterer = self.client.new_table_alterer(table) + table = alterer.rename('alter-newname').alter() + self.assertEqual(table.name, 'alter-newname') + finally: + self.client.delete_table('alter-newname') + + def test_alter_column(self): + try: + self.client.create_table('alter-column', + self.schema, + self.partitioning) + table = self.client.table('alter-column') + alterer = self.client.new_table_alterer(table) + alterer.alter_column('string_val',rename_to='string_val_renamed') + table = alterer.alter() + + # Confirm column rename + col = table['string_val_renamed'] + + finally: + self.client.delete_table('alter-column') + + def test_alter_table_add_drop_column(self): + table = self.client.table(self.ex_table) + alterer = self.client.new_table_alterer(table) + alterer.add_column('added-column', type_='int64', default=0) + table = alterer.alter() + + # Confirm column was added + expected_repr = 'Column(added-column, parent={0}, type=int64)'\ + .format(self.ex_table) + self.assertEqual(expected_repr, repr(table['added-column'])) + + alterer = self.client.new_table_alterer(table) + alterer.drop_column('added-column') + table = alterer.alter() + + # Confirm column has been dropped. + with self.assertRaises(KeyError): + col = table['added-column'] + + def test_alter_table_add_drop_partition(self): + # Add Range Partition + table = self.client.table(self.ex_table) + alterer = self.client.new_table_alterer(table) + alterer.add_range_partition( + lower_bound={'key': 0}, + upper_bound={'key': 100} + ) + table = alterer.alter() + # TODO(jtbirdsell): Once C++ client can list partition schema + # then this test should confirm that the partition was added. + alterer = self.client.new_table_alterer(table) + alterer.drop_range_partition( + lower_bound={'key': 0}, + upper_bound={'key': 100} + ) + table = alterer.alter() + class TestMonoDelta(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/kudu/blob/fec9b887/python/kudu/tests/test_schema.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_schema.py b/python/kudu/tests/test_schema.py index 8df1a1e..d0624b5 100644 --- a/python/kudu/tests/test_schema.py +++ b/python/kudu/tests/test_schema.py @@ -138,6 +138,16 @@ class TestSchema(unittest.TestCase): # TODO(wesm): The C++ client does not give us an API to see the storage # attributes of a column + def test_unsupported_col_spec_methods_for_create_table(self): + builder = kudu.schema_builder() + builder.add_column('test', 'int64').rename('test') + with self.assertRaises(kudu.KuduNotSupported): + builder.build() + + builder.add_column('test', 'int64').remove_default() + with self.assertRaises(kudu.KuduNotSupported): + builder.build() + def test_set_column_spec_pk(self): builder = kudu.schema_builder() key = (builder.add_column('key', 'int64', nullable=False)
