Repository: kudu
Updated Branches:
  refs/heads/master b333a36f1 -> d38a17d9b


KUDU-1680 - [python] Improve PartialRow Usability

The current semantics for setting values in a PartialRow are not
very elegant or comfortable for Python developers. This patch
improves this API by providing the ability to "load" a PartialRow
from a list, tuple or dictionary.  Some existing tests were modified,
however, some tests will continue to use the old API to ensure
backwards compatibility.

Change-Id: I5c9f57358e5048528398818fc80f32b27531a423
Reviewed-on: http://gerrit.cloudera.org:8080/4760
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b7497b30
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b7497b30
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b7497b30

Branch: refs/heads/master
Commit: b7497b304f65b814a2338e75ac3d277c0bebdb0b
Parents: b333a36
Author: Jordan Birdsell <[email protected]>
Authored: Wed Oct 19 20:04:58 2016 -0400
Committer: Will Berkeley <[email protected]>
Committed: Fri Oct 28 13:42:54 2016 +0000

----------------------------------------------------------------------
 python/kudu/client.pxd              |  42 ++++++
 python/kudu/client.pyx              | 217 +++++++++++++++++++++++--------
 python/kudu/compat.py               |   7 +
 python/kudu/schema.pxd              |   3 -
 python/kudu/schema.pyx              |  25 ++++
 python/kudu/tests/test_client.py    |  15 +--
 python/kudu/tests/test_scanner.py   |  10 +-
 python/kudu/tests/test_scantoken.py |   8 +-
 python/kudu/tests/util.py           |   4 +-
 9 files changed, 248 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/client.pxd b/python/kudu/client.pxd
new file mode 100644
index 0000000..375da00
--- /dev/null
+++ b/python/kudu/client.pxd
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from libkudu_client cimport *
+from kudu.schema cimport Schema
+
+
+cdef class Session:
+    cdef:
+        shared_ptr[KuduSession] s
+
+
+cdef class PartialRow:
+    cdef:
+        KuduPartialRow* row
+        Schema schema
+        public bint _own
+
+    cpdef set_field(self, key, value)
+
+    cpdef set_loc(self, int i, value)
+
+    cpdef set_field_null(self, key)
+
+    cpdef set_loc_null(self, int i)
+
+    cdef add_to_session(self, Session s)

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index ce897a2..97ae711 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -26,7 +26,7 @@ cimport cpython
 from cython.operator cimport dereference as deref
 
 from libkudu_client cimport *
-from kudu.compat import tobytes, frombytes
+from kudu.compat import tobytes, frombytes, dict_iter
 from kudu.schema cimport Schema, ColumnSchema
 from kudu.errors cimport check_status
 from kudu.util import to_unixtime_micros, from_unixtime_micros, from_hybridtime
@@ -648,45 +648,77 @@ cdef class Table:
     def drop(self):
         raise NotImplementedError
 
-    def new_insert(self):
+    def new_insert(self, record=None):
         """
         Create a new Insert operation. Pass the completed Insert to a Session.
+        If a record is provided, a PartialRow will be initialized with values
+        from the input record. The record can be in the form of a tuple, dict,
+        or list. Dictionary keys can be either column names, indexes, or a
+        mix of both names and indexes.
+
+        Parameters
+        ----------
+        record : tuple/list/dict
 
         Returns
         -------
         insert : Insert
         """
-        return Insert(self)
+        return Insert(self, record)
 
-    def new_upsert(self):
+    def new_upsert(self, record=None):
         """
         Create a new Upsert operation. Pass the completed Upsert to a Session.
+        If a record is provided, a PartialRow will be initialized with values
+        from the input record. The record can be in the form of a tuple, dict,
+        or list. Dictionary keys can be either column names, indexes, or a
+        mix of both names and indexes.
+
+        Parameters
+        ----------
+        record : tuple/list/dict
 
         Returns
         -------
         upsert : Upsert
         """
-        return Upsert(self)
+        return Upsert(self, record)
 
-    def new_update(self):
+    def new_update(self, record=None):
         """
         Create a new Update operation. Pass the completed Update to a Session.
+        If a record is provided, a PartialRow will be initialized with values
+        from the input record. The record can be in the form of a tuple, dict,
+        or list. Dictionary keys can be either column names, indexes, or a
+        mix of both names and indexes.
+
+        Parameters
+        ----------
+        record : tuple/list/dict
 
         Returns
         -------
         update : Update
         """
-        return Update(self)
+        return Update(self, record)
 
-    def new_delete(self):
+    def new_delete(self, record=None):
         """
         Create a new Delete operation. Pass the completed Update to a Session.
+        If a record is provided, a PartialRow will be initialized with values
+        from the input record. The record can be in the form of a tuple, dict,
+        or list. Dictionary keys can be either column names, indexes, or a
+        mix of both names and indexes.
+
+        Parameters
+        ----------
+        record : tuple/list/dict
 
         Returns
         -------
         delete : Delete
         """
-        return Delete(self)
+        return Delete(self, record)
 
     def scanner(self):
         """
@@ -962,8 +994,6 @@ cdef class Session:
     Wrapper for a client KuduSession to build up write operations to interact
     with the cluster.
     """
-    cdef:
-        shared_ptr[KuduSession] s
 
     def __cinit__(self):
         pass
@@ -1405,37 +1435,61 @@ cdef class Scanner:
 
     def new_bound(self):
         """
-        Returns a new instance of a ScanBound (subclass of PartialRow) to be
-        later set with add_lower_bound()/add_exclusive_upper_bound().
+        Returns a new instance of a PartialRow to be later set with
+        add_lower_bound()/add_exclusive_upper_bound().
 
         Returns
         -------
-        bound : ScanBound
+        bound : PartialRow
         """
-        return ScanBound(self.table)
+        return self.table.schema.new_row()
 
-    def add_lower_bound(self, ScanBound bound):
+    def add_lower_bound(self, bound):
         """
         Sets the (inclusive) lower bound of the scan.
         Returns a reference to itself to facilitate chaining.
 
+        Parameters
+        ----------
+        bound : PartialRow/tuple/list/dictionary
+
         Returns
         -------
         self : Scanner
         """
-        check_status(self.scanner.AddLowerBound(deref(bound.row)))
+        cdef:
+            PartialRow row
+        # Convert record to bound
+        if not isinstance(bound, PartialRow):
+            row = self.table.schema.new_row(bound)
+        else:
+            row = bound
+
+        check_status(self.scanner.AddLowerBound(deref(row.row)))
         return self
 
-    def add_exclusive_upper_bound(self, ScanBound bound):
+    def add_exclusive_upper_bound(self, bound):
         """
         Sets the (exclusive) upper bound of the scan.
         Returns a reference to itself to facilitate chaining.
 
+        Parameters
+        ----------
+        bound : PartialRow/tuple/list/dictionary
+
         Returns
         -------
         self : Scanner
         """
-        check_status(self.scanner.AddExclusiveUpperBound(deref(bound.row)))
+        cdef:
+            PartialRow row
+        # Convert record to bound
+        if not isinstance(bound, PartialRow):
+            row = self.table.schema.new_row(bound)
+        else:
+            row = bound
+
+        check_status(self.scanner.AddExclusiveUpperBound(deref(row.row)))
         return self
 
     def get_projection_schema(self):
@@ -1817,45 +1871,61 @@ cdef class ScanTokenBuilder:
 
     def new_bound(self):
         """
-        Returns a new instance of a ScanBound (subclass of PartialRow) to be
-        later set with add_lower_bound()/add_upper_bound().
+        Returns a new instance of a PartialRow to be later set with
+        add_lower_bound()/add_upper_bound().
 
         Returns
         -------
-        bound : ScanBound
+        bound : PartialRow
         """
-        return ScanBound(self._table)
+        return self._table.schema.new_row()
 
-    def add_lower_bound(self, ScanBound bound):
+    def add_lower_bound(self, bound):
         """
         Sets the lower bound of the scan.
         Returns a reference to itself to facilitate chaining.
 
         Parameters
         ----------
-        bound : ScanBound
+        bound : PartialRow/list/tuple/dict
 
         Returns
         -------
         self : ScanTokenBuilder
         """
-        check_status(self._builder.AddLowerBound(deref(bound.row)))
+        cdef:
+            PartialRow row
+        # Convert record to bound
+        if not isinstance(bound, PartialRow):
+            row = self._table.schema.new_row(bound)
+        else:
+            row = bound
+
+        check_status(self._builder.AddLowerBound(deref(row.row)))
         return self
 
-    def add_upper_bound(self, ScanBound bound):
+    def add_upper_bound(self, bound):
         """
         Sets the upper bound of the scan.
         Returns a reference to itself to facilitate chaining.
 
         Parameters
         ----------
-        bound : ScanBound
+        bound : PartialRow/list/tuple/dict
 
         Returns
         -------
         self : ScanTokenBuilder
         """
-        check_status(self._builder.AddUpperBound(deref(bound.row)))
+        cdef:
+            PartialRow row
+        # Convert record to bound
+        if not isinstance(bound, PartialRow):
+            row = self._table.schema.new_row(bound)
+        else:
+            row = bound
+
+        check_status(self._builder.AddUpperBound(deref(row.row)))
         return self
 
     def set_cache_blocks(self, cache_blocks):
@@ -1973,13 +2043,15 @@ cdef class KuduError:
 
 
 cdef class PartialRow:
-    cdef:
-        Table table
-        KuduPartialRow* row
 
-    def __cinit__(self, Table table):
+    def __cinit__(self, Schema schema):
         # This gets called before any subclass cinit methods
-        self.table = table
+        self.schema = schema
+        self._own = 1
+
+    def __dealloc__(self):
+        if self._own and self.row != NULL:
+            del self.row
 
     def __setitem__(self, key, value):
         if isinstance(key, basestring):
@@ -1987,15 +2059,42 @@ cdef class PartialRow:
         else:
             self.set_loc(key, value)
 
+    def from_record(self, record):
+        """
+        Initializes PartialRow with values from an input record. The record
+        can be in the form of a tuple, dict, or list. Dictionary keys can
+        be either column names or indexes.
+
+        Parameters
+        ----------
+        record : tuple/list/dict
+
+        Returns
+        -------
+        self : PartialRow
+        """
+        if isinstance(record, (tuple, list)):
+            for indx, val in enumerate(record):
+                self[indx] = val
+        elif isinstance(record, dict):
+            for key, val in dict_iter(record):
+                self[key] = val
+        else:
+            raise TypeError("Invalid record type <{0}> for " +
+                            "PartialRow.from_record."
+                            .format(type(record).__name__))
+
+        return self
+
     cpdef set_field(self, key, value):
         cdef:
-            int i = self.table.schema.get_loc(key)
+            int i = self.schema.get_loc(key)
 
         self.set_loc(i, value)
 
     cpdef set_loc(self, int i, value):
         cdef:
-            DataType t = self.table.schema.loc_type(i)
+            DataType t = self.schema.loc_type(i)
             cdef Slice* slc
 
         if value is None:
@@ -2044,22 +2143,19 @@ cdef class PartialRow:
     cdef add_to_session(self, Session s):
         pass
 
-cdef class ScanBound(PartialRow):
-    def __cinit__(self, Table table):
-        self.row = self.table.schema.new_row()
 
-    def __dealloc__(self):
-        del self.row
-
-cdef class WriteOperation(PartialRow):
+cdef class WriteOperation:
     cdef:
         # Whether the WriteOperation has been applied.
         # Set by subclasses.
         bint applied
         KuduWriteOperation* op
+        PartialRow py_row
 
-    def __cinit__(self, Table table):
+    def __cinit__(self, Table table, record=None):
         self.applied = 0
+        self.py_row = PartialRow(table.schema)
+        self.py_row._own = 0
 
     cdef add_to_session(self, Session s):
         if self.applied:
@@ -2069,38 +2165,51 @@ cdef class WriteOperation(PartialRow):
         self.op = NULL
         self.applied = 1
 
+    def __setitem__(self, key, value):
+        # Since the write operation is no longer a sub-class of the PartialRow
+        # we need to explicitly retain the item setting functionality and API
+        # style.
+        self.py_row[key] = value
+
 
 cdef class Insert(WriteOperation):
-    def __cinit__(self, Table table):
-        self.op = self.table.ptr().NewInsert()
-        self.row = self.op.mutable_row()
+    def __cinit__(self, Table table, record=None):
+        self.op = table.ptr().NewInsert()
+        self.py_row.row = self.op.mutable_row()
+        if record:
+            self.py_row.from_record(record)
 
     def __dealloc__(self):
         del self.op
 
 
 cdef class Upsert(WriteOperation):
-    def __cinit__(self, Table table):
+    def __cinit__(self, Table table, record=None):
         self.op = table.ptr().NewUpsert()
-        self.row = self.op.mutable_row()
-
+        self.py_row.row = self.op.mutable_row()
+        if record:
+            self.py_row.from_record(record)
     def __dealloc__(self):
         del self.op
 
 
 cdef class Update(WriteOperation):
-    def __cinit__(self, Table table):
+    def __cinit__(self, Table table, record=None):
         self.op = table.ptr().NewUpdate()
-        self.row = self.op.mutable_row()
+        self.py_row.row = self.op.mutable_row()
+        if record:
+            self.py_row.from_record(record)
 
     def __dealloc__(self):
         del self.op
 
 
 cdef class Delete(WriteOperation):
-    def __cinit__(self, Table table):
+    def __cinit__(self, Table table, record=None):
         self.op = table.ptr().NewDelete()
-        self.row = self.op.mutable_row()
+        self.py_row.row = self.op.mutable_row()
+        if record:
+            self.py_row.from_record(record)
 
     def __dealloc__(self):
         del self.op

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/compat.py
----------------------------------------------------------------------
diff --git a/python/kudu/compat.py b/python/kudu/compat.py
index 7c3556f..c3343a6 100644
--- a/python/kudu/compat.py
+++ b/python/kudu/compat.py
@@ -69,6 +69,10 @@ if PY2:
 
     def frombytes(o):
         return o
+
+    def dict_iter(o):
+        return o.items()
+
 else:
     unicode_type = str
     def lzip(*x):
@@ -89,6 +93,9 @@ else:
     def frombytes(o):
         return o.decode('utf8')
 
+    def dict_iter(o):
+        return list(o.items())
+
 
 integer_types = six.integer_types
 if np is not None:

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/schema.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/schema.pxd b/python/kudu/schema.pxd
index 085aaa0..b70f8ad 100644
--- a/python/kudu/schema.pxd
+++ b/python/kudu/schema.pxd
@@ -57,6 +57,3 @@ cdef class Schema:
 
     cdef inline DataType loc_type(self, int i):
         return self.schema.Column(i).type()
-
-    cdef inline KuduPartialRow* new_row(self):
-        return self.schema.NewRow()

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/schema.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/schema.pyx b/python/kudu/schema.pyx
index a5ef971..cbef9d9 100644
--- a/python/kudu/schema.pyx
+++ b/python/kudu/schema.pyx
@@ -23,6 +23,7 @@ from cython.operator cimport dereference as deref
 from kudu.compat import tobytes, frombytes
 from kudu.schema cimport *
 from kudu.errors cimport check_status
+from kudu.client cimport PartialRow
 
 import six
 
@@ -539,6 +540,30 @@ cdef class Schema:
 
         return result
 
+    def new_row(self, record=None):
+        """
+        Create a new row corresponding to this schema. If a record is provided,
+        a PartialRow will be initialized with values from the input record.
+        The record can be in the form of a tuple, dict, or list. Dictionary
+        keys can be either column names, indexes, or a mix of both names and
+        indexes.
+
+        Parameters
+        ----------
+        record : tuple/list/dict
+
+        Returns
+        -------
+        row : PartialRow
+        """
+        result = PartialRow(self)
+        result.row = self.schema.NewRow()
+
+        if record:
+            result.from_record(record)
+
+        return result
+
     def primary_key_indices(self):
         """
         Return the indices of the columns used as primary keys

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index b396d94..1e112ae 100644
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -150,10 +150,7 @@ class TestClient(KuduTestBase, unittest.TestCase):
         table = self.client.table(self.ex_table)
         session = self.client.new_session()
         for i in range(nrows):
-            op = table.new_insert()
-            op['key'] = i
-            op['int_val'] = i * 2
-            op['string_val'] = 'hello_%d' % i
+            op = table.new_insert((i, i*2, 'hello_%d' % i))
             session.apply(op)
 
         # Cannot apply the same insert twice, C++ client does not indicate an
@@ -173,10 +170,9 @@ class TestClient(KuduTestBase, unittest.TestCase):
         op['unixtime_micros_val'] = datetime.datetime(2016, 10, 30, 10, 12)
         session.apply(op)
 
-        op = table.new_upsert()
-        op['key'] = 2
-        op['int_val'] = 222
-        op['string_val'] = 'upserted'
+        op = table.new_upsert({0: 2,
+                               1: 222,
+                               2: 'upserted'})
         session.apply(op)
         session.flush()
 
@@ -190,8 +186,7 @@ class TestClient(KuduTestBase, unittest.TestCase):
 
         # Delete the rows we just wrote
         for i in range(nrows):
-            op = table.new_delete()
-            op['key'] = i
+            op = table.new_delete({'key': i})
             session.apply(op)
         session.flush()
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py 
b/python/kudu/tests/test_scanner.py
index f010f36..ebacb72 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -121,13 +121,9 @@ class TestScanner(TestScanBase):
 
     def test_scan_with_bounds(self):
         scanner = self.table.scanner()
-        scanner.set_fault_tolerant()
-        lower_bound = scanner.new_bound()
-        lower_bound['key'] = 50
-        scanner.add_lower_bound(lower_bound)
-        upper_bound = scanner.new_bound()
-        upper_bound['key'] = 55
-        scanner.add_exclusive_upper_bound(upper_bound)
+        scanner.set_fault_tolerant()\
+            .add_lower_bound({'key': 50})\
+            .add_exclusive_upper_bound({'key': 55})
         scanner.open()
 
         tuples = scanner.read_all_tuples()

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py 
b/python/kudu/tests/test_scantoken.py
index da879d5..897d780 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -97,13 +97,9 @@ class TestScanToken(TestScanBase):
         in parallel with seperate clients.
         """
         builder = self.table.scan_token_builder()
-        lower_bound = builder.new_bound()
-        lower_bound['key'] = 50
-        upper_bound = builder.new_bound()
-        upper_bound['key'] = 55
         builder.set_fault_tolerant()\
-            .add_lower_bound(lower_bound)\
-            .add_upper_bound(upper_bound)
+            .add_lower_bound([50])\
+            .add_upper_bound([55])
 
         # Serialize execute and verify
         self._subtest_serialize_thread_and_verify(builder.build(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/b7497b30/python/kudu/tests/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py
index 9f64e39..230f723 100644
--- a/python/kudu/tests/util.py
+++ b/python/kudu/tests/util.py
@@ -92,9 +92,7 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
         ]
         session = self.client.new_session()
         for row in self.type_test_rows:
-            op = self.type_table.new_insert()
-            for idx, val in enumerate(row):
-                op[idx] = val
+            op = self.type_table.new_insert(row)
             session.apply(op)
         session.flush()
 

Reply via email to