This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ff6d80f [python] KUDU-1563. Add support for UPDATE_IGNORE and
DELETE_IGNORE
ff6d80f is described below
commit ff6d80f3e91ae9011545edc9b94b25753908d790
Author: Grant Henke <[email protected]>
AuthorDate: Tue Oct 27 14:52:27 2020 -0500
[python] KUDU-1563. Add support for UPDATE_IGNORE and DELETE_IGNORE
Implements Python support for the `UPDATE_IGNORE' and
`DELETE_IGNORE ` operations
Change-Id: Iad0fbd1db1edc140b88e1559ecf5d89bf6d9cbb9
Reviewed-on: http://gerrit.cloudera.org:8080/16667
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
python/kudu/__init__.py | 3 ++-
python/kudu/client.pyx | 53 ++++++++++++++++++++++++++++++++++++++++
python/kudu/libkudu_client.pxd | 14 +++++++++++
python/kudu/tests/test_client.py | 12 ++++++++-
4 files changed, 80 insertions(+), 2 deletions(-)
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 4c53905..ece031c 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -16,7 +16,8 @@
# under the License.
from kudu.client import (Client, Table, Scanner, Session, # noqa
- Insert, InsertIgnore, Update, Delete, Predicate,
+ Insert, InsertIgnore, Update, UpdateIgnore,
+ Delete, DeleteIgnore, Predicate,
TimeDelta, KuduError, ScanTokenBuilder,
ScanToken,
LEADER_ONLY,
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 90f84fe..8edf511 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -915,6 +915,24 @@ cdef class Table:
"""
return Update(self, record)
+ def new_update_ignore(self, record=None):
+ """
+ Create a new UpdateIgnore operation. Pass the completed UpdateIgnore
to a Session.
+ If a record is provided, a PartialRow will be initialized with values
+ from the input record. The record can be in the form of a tuple, dict,
+ or list. Dictionary keys can be either column names, indexes, or a
+ mix of both names and indexes.
+
+ Parameters
+ ----------
+ record : tuple/list/dict
+
+ Returns
+ -------
+ updateIgnore : UpdateIgnore
+ """
+ return UpdateIgnore(self, record)
+
def new_delete(self, record=None):
"""
Create a new Delete operation. Pass the completed Update to a Session.
@@ -933,6 +951,24 @@ cdef class Table:
"""
return Delete(self, record)
+ def new_delete_ignore(self, record=None):
+ """
+ Create a new DeleteIgnore operation. Pass the completed DeleteIgnore
to a Session.
+ If a record is provided, a PartialRow will be initialized with values
+ from the input record. The record can be in the form of a tuple, dict,
+ or list. Dictionary keys can be either column names, indexes, or a
+ mix of both names and indexes.
+
+ Parameters
+ ----------
+ record : tuple/list/dict
+
+ Returns
+ -------
+ deleteIgnore : DeleteIgnore
+ """
+ return DeleteIgnore(self, record)
+
def scanner(self):
"""
Create a new scanner for this table for retrieving a selection of table
@@ -2920,6 +2956,15 @@ cdef class Update(WriteOperation):
def __dealloc__(self):
del self.op
+cdef class UpdateIgnore(WriteOperation):
+ def __cinit__(self, Table table, record=None):
+ self.op = table.ptr().NewUpdateIgnore()
+ self.py_row.row = self.op.mutable_row()
+ if record:
+ self.py_row.from_record(record)
+
+ def __dealloc__(self):
+ del self.op
cdef class Delete(WriteOperation):
def __cinit__(self, Table table, record=None):
@@ -2931,7 +2976,15 @@ cdef class Delete(WriteOperation):
def __dealloc__(self):
del self.op
+cdef class DeleteIgnore(WriteOperation):
+ def __cinit__(self, Table table, record=None):
+ self.op = table.ptr().NewDeleteIgnore()
+ self.py_row.row = self.op.mutable_row()
+ if record:
+ self.py_row.from_record(record)
+ def __dealloc__(self):
+ del self.op
cdef inline cast_pyvalue(DataType t, object o):
if t == KUDU_BOOL:
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 8ede50e..54fc553 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -450,6 +450,10 @@ cdef extern from "kudu/client/write_op.h" namespace
"kudu::client" nogil:
INSERT " kudu::client::KuduWriteOperation::INSERT"
UPDATE " kudu::client::KuduWriteOperation::UPDATE"
DELETE " kudu::client::KuduWriteOperation::DELETE"
+ UPSERT " kudu::client::KuduWriteOperation::UPSERT"
+ INSERT_IGNORE " kudu::client::KuduWriteOperation::INSERT_IGNORE"
+ UPDATE_IGNORE " kudu::client::KuduWriteOperation::UPDATE_IGNORE"
+ DELETE_IGNORE " kudu::client::KuduWriteOperation::DELETE_IGNORE"
cdef cppclass KuduWriteOperation:
KuduPartialRow& row()
@@ -474,9 +478,15 @@ cdef extern from "kudu/client/write_op.h" namespace
"kudu::client" nogil:
cdef cppclass KuduDelete(KuduWriteOperation):
pass
+ cdef cppclass KuduDeleteIgnore(KuduWriteOperation):
+ pass
+
cdef cppclass KuduUpdate(KuduWriteOperation):
pass
+ cdef cppclass KuduUpdateIgnore(KuduWriteOperation):
+ pass
+
cdef extern from "kudu/client/scan_predicate.h" namespace "kudu::client" nogil:
enum ComparisonOp" kudu::client::KuduPredicate::ComparisonOp":
@@ -629,7 +639,9 @@ cdef extern from "kudu/client/client.h" namespace
"kudu::client" nogil:
KuduInsertIgnore* NewInsertIgnore()
KuduUpsert* NewUpsert()
KuduUpdate* NewUpdate()
+ KuduUpdateIgnore* NewUpdateIgnore()
KuduDelete* NewDelete()
+ KuduDeleteIgnore* NewDeleteIgnore()
KuduPredicate* NewComparisonPredicate(const Slice& col_name,
ComparisonOp op,
@@ -665,7 +677,9 @@ cdef extern from "kudu/client/client.h" namespace
"kudu::client" nogil:
Status Apply(KuduInsertIgnore* write_op)
Status Apply(KuduUpsert* write_op)
Status Apply(KuduUpdate* write_op)
+ Status Apply(KuduUpdateIgnore* write_op)
Status Apply(KuduDelete* write_op)
+ Status Apply(KuduDeleteIgnore* write_op)
# This is thread-safe
Status Flush()
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index eae0d49..b7f4ffe 100755
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -213,6 +213,13 @@ class TestClient(KuduTestBase, unittest.TestCase):
op = table.new_insert_ignore((3, 1, 'hello_1'))
session.apply(op)
+ # Update ignore a missing row
+ op = table.new_update_ignore((-999, 1, 'hello_1'))
+ session.apply(op)
+
+ # Delete ignore a missing row
+ op = table.new_delete_ignore({'key': -998})
+ session.apply(op)
scanner = table.scanner().open()
rows = dict((t[0], t) for t in scanner.read_all_tuples())
@@ -225,7 +232,10 @@ class TestClient(KuduTestBase, unittest.TestCase):
# Delete the rows we just wrote
for i in range(nrows):
- op = table.new_delete({'key': i})
+ if i % 2 == 0:
+ op = table.new_delete({'key': i})
+ else:
+ op = table.new_delete_ignore({'key': i})
session.apply(op)
session.flush()