Repository: kudu Updated Branches: refs/heads/master b333a36f1 -> d38a17d9b
KUDU-1680 - [python] Improve PartialRow Usability The current semantics for setting values in a PartialRow are not very elegant or comfortable for Python developers. This patch improves this API by providing the ability to "load" a PartialRow from a list, tuple or dictionary. Some existing tests were modified, however, some tests will continue to use the old API to ensure backwards compatibility. Change-Id: I5c9f57358e5048528398818fc80f32b27531a423 Reviewed-on: http://gerrit.cloudera.org:8080/4760 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b7497b30 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b7497b30 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b7497b30 Branch: refs/heads/master Commit: b7497b304f65b814a2338e75ac3d277c0bebdb0b Parents: b333a36 Author: Jordan Birdsell <[email protected]> Authored: Wed Oct 19 20:04:58 2016 -0400 Committer: Will Berkeley <[email protected]> Committed: Fri Oct 28 13:42:54 2016 +0000 ---------------------------------------------------------------------- python/kudu/client.pxd | 42 ++++++ python/kudu/client.pyx | 217 +++++++++++++++++++++++-------- python/kudu/compat.py | 7 + python/kudu/schema.pxd | 3 - python/kudu/schema.pyx | 25 ++++ python/kudu/tests/test_client.py | 15 +-- python/kudu/tests/test_scanner.py | 10 +- python/kudu/tests/test_scantoken.py | 8 +- python/kudu/tests/util.py | 4 +- 9 files changed, 248 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/client.pxd b/python/kudu/client.pxd new file mode 100644 index 0000000..375da00 --- /dev/null +++ b/python/kudu/client.pxd @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from libkudu_client cimport * +from kudu.schema cimport Schema + + +cdef class Session: + cdef: + shared_ptr[KuduSession] s + + +cdef class PartialRow: + cdef: + KuduPartialRow* row + Schema schema + public bint _own + + cpdef set_field(self, key, value) + + cpdef set_loc(self, int i, value) + + cpdef set_field_null(self, key) + + cpdef set_loc_null(self, int i) + + cdef add_to_session(self, Session s) http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index ce897a2..97ae711 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -26,7 +26,7 @@ cimport cpython from cython.operator cimport dereference as deref from libkudu_client cimport * -from kudu.compat import tobytes, frombytes +from kudu.compat import tobytes, frombytes, dict_iter from kudu.schema cimport Schema, ColumnSchema from kudu.errors cimport check_status from kudu.util import to_unixtime_micros, from_unixtime_micros, from_hybridtime @@ -648,45 +648,77 @@ cdef class Table: def drop(self): raise NotImplementedError - def new_insert(self): + def new_insert(self, record=None): """ Create a new Insert operation. Pass the completed Insert to a Session. + If a record is provided, a PartialRow will be initialized with values + from the input record. The record can be in the form of a tuple, dict, + or list. Dictionary keys can be either column names, indexes, or a + mix of both names and indexes. + + Parameters + ---------- + record : tuple/list/dict Returns ------- insert : Insert """ - return Insert(self) + return Insert(self, record) - def new_upsert(self): + def new_upsert(self, record=None): """ Create a new Upsert operation. Pass the completed Upsert to a Session. + If a record is provided, a PartialRow will be initialized with values + from the input record. The record can be in the form of a tuple, dict, + or list. Dictionary keys can be either column names, indexes, or a + mix of both names and indexes. + + Parameters + ---------- + record : tuple/list/dict Returns ------- upsert : Upsert """ - return Upsert(self) + return Upsert(self, record) - def new_update(self): + def new_update(self, record=None): """ Create a new Update operation. Pass the completed Update to a Session. + If a record is provided, a PartialRow will be initialized with values + from the input record. The record can be in the form of a tuple, dict, + or list. Dictionary keys can be either column names, indexes, or a + mix of both names and indexes. + + Parameters + ---------- + record : tuple/list/dict Returns ------- update : Update """ - return Update(self) + return Update(self, record) - def new_delete(self): + def new_delete(self, record=None): """ Create a new Delete operation. Pass the completed Update to a Session. + If a record is provided, a PartialRow will be initialized with values + from the input record. The record can be in the form of a tuple, dict, + or list. Dictionary keys can be either column names, indexes, or a + mix of both names and indexes. + + Parameters + ---------- + record : tuple/list/dict Returns ------- delete : Delete """ - return Delete(self) + return Delete(self, record) def scanner(self): """ @@ -962,8 +994,6 @@ cdef class Session: Wrapper for a client KuduSession to build up write operations to interact with the cluster. """ - cdef: - shared_ptr[KuduSession] s def __cinit__(self): pass @@ -1405,37 +1435,61 @@ cdef class Scanner: def new_bound(self): """ - Returns a new instance of a ScanBound (subclass of PartialRow) to be - later set with add_lower_bound()/add_exclusive_upper_bound(). + Returns a new instance of a PartialRow to be later set with + add_lower_bound()/add_exclusive_upper_bound(). Returns ------- - bound : ScanBound + bound : PartialRow """ - return ScanBound(self.table) + return self.table.schema.new_row() - def add_lower_bound(self, ScanBound bound): + def add_lower_bound(self, bound): """ Sets the (inclusive) lower bound of the scan. Returns a reference to itself to facilitate chaining. + Parameters + ---------- + bound : PartialRow/tuple/list/dictionary + Returns ------- self : Scanner """ - check_status(self.scanner.AddLowerBound(deref(bound.row))) + cdef: + PartialRow row + # Convert record to bound + if not isinstance(bound, PartialRow): + row = self.table.schema.new_row(bound) + else: + row = bound + + check_status(self.scanner.AddLowerBound(deref(row.row))) return self - def add_exclusive_upper_bound(self, ScanBound bound): + def add_exclusive_upper_bound(self, bound): """ Sets the (exclusive) upper bound of the scan. Returns a reference to itself to facilitate chaining. + Parameters + ---------- + bound : PartialRow/tuple/list/dictionary + Returns ------- self : Scanner """ - check_status(self.scanner.AddExclusiveUpperBound(deref(bound.row))) + cdef: + PartialRow row + # Convert record to bound + if not isinstance(bound, PartialRow): + row = self.table.schema.new_row(bound) + else: + row = bound + + check_status(self.scanner.AddExclusiveUpperBound(deref(row.row))) return self def get_projection_schema(self): @@ -1817,45 +1871,61 @@ cdef class ScanTokenBuilder: def new_bound(self): """ - Returns a new instance of a ScanBound (subclass of PartialRow) to be - later set with add_lower_bound()/add_upper_bound(). + Returns a new instance of a PartialRow to be later set with + add_lower_bound()/add_upper_bound(). Returns ------- - bound : ScanBound + bound : PartialRow """ - return ScanBound(self._table) + return self._table.schema.new_row() - def add_lower_bound(self, ScanBound bound): + def add_lower_bound(self, bound): """ Sets the lower bound of the scan. Returns a reference to itself to facilitate chaining. Parameters ---------- - bound : ScanBound + bound : PartialRow/list/tuple/dict Returns ------- self : ScanTokenBuilder """ - check_status(self._builder.AddLowerBound(deref(bound.row))) + cdef: + PartialRow row + # Convert record to bound + if not isinstance(bound, PartialRow): + row = self._table.schema.new_row(bound) + else: + row = bound + + check_status(self._builder.AddLowerBound(deref(row.row))) return self - def add_upper_bound(self, ScanBound bound): + def add_upper_bound(self, bound): """ Sets the upper bound of the scan. Returns a reference to itself to facilitate chaining. Parameters ---------- - bound : ScanBound + bound : PartialRow/list/tuple/dict Returns ------- self : ScanTokenBuilder """ - check_status(self._builder.AddUpperBound(deref(bound.row))) + cdef: + PartialRow row + # Convert record to bound + if not isinstance(bound, PartialRow): + row = self._table.schema.new_row(bound) + else: + row = bound + + check_status(self._builder.AddUpperBound(deref(row.row))) return self def set_cache_blocks(self, cache_blocks): @@ -1973,13 +2043,15 @@ cdef class KuduError: cdef class PartialRow: - cdef: - Table table - KuduPartialRow* row - def __cinit__(self, Table table): + def __cinit__(self, Schema schema): # This gets called before any subclass cinit methods - self.table = table + self.schema = schema + self._own = 1 + + def __dealloc__(self): + if self._own and self.row != NULL: + del self.row def __setitem__(self, key, value): if isinstance(key, basestring): @@ -1987,15 +2059,42 @@ cdef class PartialRow: else: self.set_loc(key, value) + def from_record(self, record): + """ + Initializes PartialRow with values from an input record. The record + can be in the form of a tuple, dict, or list. Dictionary keys can + be either column names or indexes. + + Parameters + ---------- + record : tuple/list/dict + + Returns + ------- + self : PartialRow + """ + if isinstance(record, (tuple, list)): + for indx, val in enumerate(record): + self[indx] = val + elif isinstance(record, dict): + for key, val in dict_iter(record): + self[key] = val + else: + raise TypeError("Invalid record type <{0}> for " + + "PartialRow.from_record." + .format(type(record).__name__)) + + return self + cpdef set_field(self, key, value): cdef: - int i = self.table.schema.get_loc(key) + int i = self.schema.get_loc(key) self.set_loc(i, value) cpdef set_loc(self, int i, value): cdef: - DataType t = self.table.schema.loc_type(i) + DataType t = self.schema.loc_type(i) cdef Slice* slc if value is None: @@ -2044,22 +2143,19 @@ cdef class PartialRow: cdef add_to_session(self, Session s): pass -cdef class ScanBound(PartialRow): - def __cinit__(self, Table table): - self.row = self.table.schema.new_row() - def __dealloc__(self): - del self.row - -cdef class WriteOperation(PartialRow): +cdef class WriteOperation: cdef: # Whether the WriteOperation has been applied. # Set by subclasses. bint applied KuduWriteOperation* op + PartialRow py_row - def __cinit__(self, Table table): + def __cinit__(self, Table table, record=None): self.applied = 0 + self.py_row = PartialRow(table.schema) + self.py_row._own = 0 cdef add_to_session(self, Session s): if self.applied: @@ -2069,38 +2165,51 @@ cdef class WriteOperation(PartialRow): self.op = NULL self.applied = 1 + def __setitem__(self, key, value): + # Since the write operation is no longer a sub-class of the PartialRow + # we need to explicitly retain the item setting functionality and API + # style. + self.py_row[key] = value + cdef class Insert(WriteOperation): - def __cinit__(self, Table table): - self.op = self.table.ptr().NewInsert() - self.row = self.op.mutable_row() + def __cinit__(self, Table table, record=None): + self.op = table.ptr().NewInsert() + self.py_row.row = self.op.mutable_row() + if record: + self.py_row.from_record(record) def __dealloc__(self): del self.op cdef class Upsert(WriteOperation): - def __cinit__(self, Table table): + def __cinit__(self, Table table, record=None): self.op = table.ptr().NewUpsert() - self.row = self.op.mutable_row() - + self.py_row.row = self.op.mutable_row() + if record: + self.py_row.from_record(record) def __dealloc__(self): del self.op cdef class Update(WriteOperation): - def __cinit__(self, Table table): + def __cinit__(self, Table table, record=None): self.op = table.ptr().NewUpdate() - self.row = self.op.mutable_row() + self.py_row.row = self.op.mutable_row() + if record: + self.py_row.from_record(record) def __dealloc__(self): del self.op cdef class Delete(WriteOperation): - def __cinit__(self, Table table): + def __cinit__(self, Table table, record=None): self.op = table.ptr().NewDelete() - self.row = self.op.mutable_row() + self.py_row.row = self.op.mutable_row() + if record: + self.py_row.from_record(record) def __dealloc__(self): del self.op http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/compat.py ---------------------------------------------------------------------- diff --git a/python/kudu/compat.py b/python/kudu/compat.py index 7c3556f..c3343a6 100644 --- a/python/kudu/compat.py +++ b/python/kudu/compat.py @@ -69,6 +69,10 @@ if PY2: def frombytes(o): return o + + def dict_iter(o): + return o.items() + else: unicode_type = str def lzip(*x): @@ -89,6 +93,9 @@ else: def frombytes(o): return o.decode('utf8') + def dict_iter(o): + return list(o.items()) + integer_types = six.integer_types if np is not None: http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/schema.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/schema.pxd b/python/kudu/schema.pxd index 085aaa0..b70f8ad 100644 --- a/python/kudu/schema.pxd +++ b/python/kudu/schema.pxd @@ -57,6 +57,3 @@ cdef class Schema: cdef inline DataType loc_type(self, int i): return self.schema.Column(i).type() - - cdef inline KuduPartialRow* new_row(self): - return self.schema.NewRow() http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/schema.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/schema.pyx b/python/kudu/schema.pyx index a5ef971..cbef9d9 100644 --- a/python/kudu/schema.pyx +++ b/python/kudu/schema.pyx @@ -23,6 +23,7 @@ from cython.operator cimport dereference as deref from kudu.compat import tobytes, frombytes from kudu.schema cimport * from kudu.errors cimport check_status +from kudu.client cimport PartialRow import six @@ -539,6 +540,30 @@ cdef class Schema: return result + def new_row(self, record=None): + """ + Create a new row corresponding to this schema. If a record is provided, + a PartialRow will be initialized with values from the input record. + The record can be in the form of a tuple, dict, or list. Dictionary + keys can be either column names, indexes, or a mix of both names and + indexes. + + Parameters + ---------- + record : tuple/list/dict + + Returns + ------- + row : PartialRow + """ + result = PartialRow(self) + result.row = self.schema.NewRow() + + if record: + result.from_record(record) + + return result + def primary_key_indices(self): """ Return the indices of the columns used as primary keys http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/test_client.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index b396d94..1e112ae 100644 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -150,10 +150,7 @@ class TestClient(KuduTestBase, unittest.TestCase): table = self.client.table(self.ex_table) session = self.client.new_session() for i in range(nrows): - op = table.new_insert() - op['key'] = i - op['int_val'] = i * 2 - op['string_val'] = 'hello_%d' % i + op = table.new_insert((i, i*2, 'hello_%d' % i)) session.apply(op) # Cannot apply the same insert twice, C++ client does not indicate an @@ -173,10 +170,9 @@ class TestClient(KuduTestBase, unittest.TestCase): op['unixtime_micros_val'] = datetime.datetime(2016, 10, 30, 10, 12) session.apply(op) - op = table.new_upsert() - op['key'] = 2 - op['int_val'] = 222 - op['string_val'] = 'upserted' + op = table.new_upsert({0: 2, + 1: 222, + 2: 'upserted'}) session.apply(op) session.flush() @@ -190,8 +186,7 @@ class TestClient(KuduTestBase, unittest.TestCase): # Delete the rows we just wrote for i in range(nrows): - op = table.new_delete() - op['key'] = i + op = table.new_delete({'key': i}) session.apply(op) session.flush() http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/test_scanner.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py index f010f36..ebacb72 100644 --- a/python/kudu/tests/test_scanner.py +++ b/python/kudu/tests/test_scanner.py @@ -121,13 +121,9 @@ class TestScanner(TestScanBase): def test_scan_with_bounds(self): scanner = self.table.scanner() - scanner.set_fault_tolerant() - lower_bound = scanner.new_bound() - lower_bound['key'] = 50 - scanner.add_lower_bound(lower_bound) - upper_bound = scanner.new_bound() - upper_bound['key'] = 55 - scanner.add_exclusive_upper_bound(upper_bound) + scanner.set_fault_tolerant()\ + .add_lower_bound({'key': 50})\ + .add_exclusive_upper_bound({'key': 55}) scanner.open() tuples = scanner.read_all_tuples() http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/test_scantoken.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py index da879d5..897d780 100644 --- a/python/kudu/tests/test_scantoken.py +++ b/python/kudu/tests/test_scantoken.py @@ -97,13 +97,9 @@ class TestScanToken(TestScanBase): in parallel with seperate clients. """ builder = self.table.scan_token_builder() - lower_bound = builder.new_bound() - lower_bound['key'] = 50 - upper_bound = builder.new_bound() - upper_bound['key'] = 55 builder.set_fault_tolerant()\ - .add_lower_bound(lower_bound)\ - .add_upper_bound(upper_bound) + .add_lower_bound([50])\ + .add_upper_bound([55]) # Serialize execute and verify self._subtest_serialize_thread_and_verify(builder.build(), http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py index 9f64e39..230f723 100644 --- a/python/kudu/tests/util.py +++ b/python/kudu/tests/util.py @@ -92,9 +92,7 @@ class TestScanBase(KuduTestBase, unittest.TestCase): ] session = self.client.new_session() for row in self.type_test_rows: - op = self.type_table.new_insert() - for idx, val in enumerate(row): - op[idx] = val + op = self.type_table.new_insert(row) session.apply(op) session.flush()
