This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1b015d7ee9cc8af262f398961cf471f76bb225ea Author: Marton Greber <[email protected]> AuthorDate: Sun Oct 1 11:13:06 2023 +0000 [Python] KUDU-3353 add support for UPSERT IGNORE This patch is a follow-up to commit: ec3a9f75b6924a70ecbf08e3805228ad9b92b9f0, it adds UPSERT IGNORE support to the Python client. Extended the already existing tests: * added write op metrics verification for immutable column tests, * extended immutable column tests with UPSERT IGNORE test, * addressed an UPSERT IGNORE TODO in the auto-incrementing column tests. Change-Id: I9112b96a5688287352307c05e60030a217154cbd Reviewed-on: http://gerrit.cloudera.org:8080/20527 Tested-by: Kudu Jenkins Reviewed-by: Yingchun Lai <[email protected]> --- python/kudu/__init__.py | 4 +- python/kudu/client.pyx | 29 ++++++++++ python/kudu/libkudu_client.pxd | 5 ++ python/kudu/tests/test_client.py | 117 ++++++++++++++++++++++++++++++++++----- 4 files changed, 139 insertions(+), 16 deletions(-) diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py index f0990e53d..f932f6bcc 100644 --- a/python/kudu/__init__.py +++ b/python/kudu/__init__.py @@ -17,8 +17,8 @@ from kudu.client import (Client, Table, Scanner, Session, # noqa Insert, InsertIgnore, Update, UpdateIgnore, - Delete, DeleteIgnore, Predicate, - TimeDelta, KuduError, ScanTokenBuilder, + Delete, DeleteIgnore, Upsert, UpsertIgnore, + Predicate, TimeDelta, KuduError, ScanTokenBuilder, ScanToken, LEADER_ONLY, CLOSEST_REPLICA, diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index d76bdf9e1..861346b22 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -1052,6 +1052,24 @@ cdef class Table: """ return Upsert(self, record) + def new_upsert_ignore(self, record=None): + """ + Create a new UpsertIgnore operation. Pass the completed UpsertIgnore to a Session. + If a record is provided, a PartialRow will be initialized with values + from the input record. The record can be in the form of a tuple, dict, + or list. Dictionary keys can be either column names, indexes, or a + mix of both names and indexes. + + Parameters + ---------- + record : tuple/list/dict + + Returns + ------- + upsertIgnore : UpsertIgnore + """ + return UpsertIgnore(self, record) + def new_update(self, record=None): """ Create a new Update operation. Pass the completed Update to a Session. @@ -3183,6 +3201,17 @@ cdef class Upsert(WriteOperation): del self.op +cdef class UpsertIgnore(WriteOperation): + def __cinit__(self, Table table, record=None): + self.op = table.ptr().NewUpsertIgnore() + self.py_row.row = self.op.mutable_row() + if record: + self.py_row.from_record(record) + + def __dealloc__(self): + del self.op + + cdef class Update(WriteOperation): def __cinit__(self, Table table, record=None): self.op = table.ptr().NewUpdate() diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index 5ff8e3229..4fd96ded3 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -464,6 +464,7 @@ cdef extern from "kudu/client/write_op.h" namespace "kudu::client" nogil: INSERT_IGNORE " kudu::client::KuduWriteOperation::INSERT_IGNORE" UPDATE_IGNORE " kudu::client::KuduWriteOperation::UPDATE_IGNORE" DELETE_IGNORE " kudu::client::KuduWriteOperation::DELETE_IGNORE" + UPSERT_IGNORE " kudu::client::KuduWriteOperation::UPSERT_IGNORE" cdef cppclass KuduWriteOperation: KuduPartialRow& row() @@ -485,6 +486,9 @@ cdef extern from "kudu/client/write_op.h" namespace "kudu::client" nogil: cdef cppclass KuduUpsert(KuduWriteOperation): pass + cdef cppclass KuduUpsertIgnore(KuduWriteOperation): + pass + cdef cppclass KuduDelete(KuduWriteOperation): pass @@ -684,6 +688,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: KuduInsert* NewInsert() KuduInsertIgnore* NewInsertIgnore() KuduUpsert* NewUpsert() + KuduUpsertIgnore* NewUpsertIgnore() KuduUpdate* NewUpdate() KuduUpdateIgnore* NewUpdateIgnore() KuduDelete* NewDelete() diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index 5f910df2c..6a3f101b1 100755 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -378,22 +378,32 @@ class TestClient(KuduTestBase, CompatUnitTest): session = self.client.new_session() op = table.new_insert() op['key'] = 1 - op['immutable_data'] = 'text' - op['mutable_data'] = 'text' + op['immutable_data'] = 'immutable_text' + op['mutable_data'] = 'mutable_text' session.apply(op) session.flush() + # successful_inserts++ + self.doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 1 == len(results) + assert (1, 'immutable_text', 'mutable_text') in results # Update the mutable columns op = table.new_update() op['key'] = 1 - op['mutable_data'] = 'new_text' + op['mutable_data'] = 'new_mutable_text_update' session.apply(op) session.flush() + # successful_updates++ + self.doVerifyMetrics(session, 1, 0, 0, 0, 1, 0, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 1 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_update') in results # Update the immutable column op = table.new_update() op['key'] = 1 - op['immutable_data'] = 'new_text' + op['immutable_data'] = 'new_immutable_text_update' session.apply(op) try: session.flush() @@ -403,19 +413,26 @@ class TestClient(KuduTestBase, CompatUnitTest): assert not overflow assert len(errors) == 1 assert message in repr(errors[0]) + # nothing changed + self.doVerifyMetrics(session, 1, 0, 0, 0, 1, 0, 0, 0) # Update ignore on both mutable and immutable columns. The error is ignored. op = table.new_update_ignore() op['key'] = 1 - op['immutable_data'] = 'new_text' - op['mutable_data'] = 'new_text' + op['immutable_data'] = 'new_immutable_text_update_ignore' + op['mutable_data'] = 'new_mutable_text_update_ignore' session.apply(op) session.flush() + # successful_updates++, update_ignore_errors++ + self.doVerifyMetrics(session, 1, 0, 0, 0, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 1 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_update_ignore') in results - # Update ignore the immutable column + # Update ignore only the immutable column results in 'Invalid argument' error op = table.new_update_ignore() op['key'] = 1 - op['immutable_data'] = 'new_text' + op['immutable_data'] = 'new_immutable_text_update_ignore' session.apply(op) try: session.flush() @@ -425,20 +442,28 @@ class TestClient(KuduTestBase, CompatUnitTest): assert not overflow assert len(errors) == 1 assert message in repr(errors[0]) - - # TODO: test upsert ignore, once it is supported by the Python client. + # nothing changed + self.doVerifyMetrics(session, 1, 0, 0, 0, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 1 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_update_ignore') in results # Upsert the mutable columns op = table.new_upsert() op['key'] = 1 - op['mutable_data'] = 'new_text' + op['mutable_data'] = 'new_mutable_text_upsert' session.apply(op) session.flush() + # successful_upserts++ + self.doVerifyMetrics(session, 1, 0, 1, 0, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 1 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_upsert') in results # Upsert the immutable column op = table.new_upsert() op['key'] = 1 - op['immutable_data'] = 'new_text' + op['immutable_data'] = 'new_immutable_text_upsert' session.apply(op) try: session.flush() @@ -448,6 +473,67 @@ class TestClient(KuduTestBase, CompatUnitTest): assert not overflow assert len(errors) == 1 assert message in repr(errors[0]) + # nothing changed + self.doVerifyMetrics(session, 1, 0, 1, 0, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 1 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_upsert') in results + + # Upsert ignore can insert a row without immutable column set + op = table.new_upsert_ignore() + op['key'] = 2 + op['mutable_data'] = 'mutable_text' + session.apply(op) + session.flush() + # successful_upserts++ + self.doVerifyMetrics(session, 1, 0, 2, 0, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 2 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_upsert') in results + assert (2, None, 'mutable_text') in results + + # Upsert ignore can insert a new row with immutable column set + op = table.new_upsert_ignore() + op['key'] = 3 + op['immutable_data'] = 'immutable_text' + session.apply(op) + session.flush() + # successful_upserts++ + self.doVerifyMetrics(session, 1, 0, 3, 0, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 3 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_upsert') in results + assert (2, None, 'mutable_text') in results + assert (3, 'immutable_text', None) in results + + # Upsert ignore can update existing row without immutable column set + op = table.new_upsert_ignore() + op['key'] = 1 + op['mutable_data'] = 'new_mutable_text_upsert_ignore' + session.apply(op) + session.flush() + # successful_upserts++ + self.doVerifyMetrics(session, 1, 0, 4, 0, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 3 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_upsert_ignore') in results + assert (2, None, 'mutable_text') in results + assert (3, 'immutable_text', None) in results + + # Try to upsert ignore existing row with immutable column set + op = table.new_upsert_ignore() + op['key'] = 1 + op['immutable_data'] = 'new_immutable_text_upsert_ignore' + op['mutable_data'] = 'new_mutable_text_upsert_ignore_2' + session.apply(op) + session.flush() + # successful_upsers++, upsert_ignore_errors++ + self.doVerifyMetrics(session, 1, 0, 5, 1, 2, 1, 0, 0) + results = table.scanner().open().read_all_tuples() + assert 3 == len(results) + assert (1, 'immutable_text', 'new_mutable_text_upsert_ignore_2') in results + assert (2, None, 'mutable_text') in results + assert (3, 'immutable_text', None) in results finally: try: @@ -494,8 +580,11 @@ class TestClient(KuduTestBase, CompatUnitTest): op[Schema.get_auto_incrementing_column_name()] = 1 session.apply(op) - # TODO: once upsert_ignore is supported by the Python client, - # check if specifying all the key columns works. + # Upsert ignore with auto-incrementing column specified + op = table.new_upsert_ignore() + op['key'] = 1 + op[Schema.get_auto_incrementing_column_name()] = 1 + session.apply(op) # With non-unique primary key, one can't use the tuple/list initialization for new # inserts. In this case, at the second position it would like to get an int64 (the type
