This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1b015d7ee9cc8af262f398961cf471f76bb225ea
Author: Marton Greber <[email protected]>
AuthorDate: Sun Oct 1 11:13:06 2023 +0000

    [Python] KUDU-3353 add support for UPSERT IGNORE
    
    This patch is a follow-up to commit:
    ec3a9f75b6924a70ecbf08e3805228ad9b92b9f0, it adds UPSERT IGNORE support
    to the Python client.
    
    Extended the already existing tests:
    * added write op metrics verification for immutable column tests,
    * extended immutable column tests with UPSERT IGNORE test,
    * addressed an UPSERT IGNORE TODO in the auto-incrementing column tests.
    
    Change-Id: I9112b96a5688287352307c05e60030a217154cbd
    Reviewed-on: http://gerrit.cloudera.org:8080/20527
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <[email protected]>
---
 python/kudu/__init__.py          |   4 +-
 python/kudu/client.pyx           |  29 ++++++++++
 python/kudu/libkudu_client.pxd   |   5 ++
 python/kudu/tests/test_client.py | 117 ++++++++++++++++++++++++++++++++++-----
 4 files changed, 139 insertions(+), 16 deletions(-)

diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index f0990e53d..f932f6bcc 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -17,8 +17,8 @@
 
 from kudu.client import (Client, Table, Scanner, Session,  # noqa
                          Insert, InsertIgnore, Update, UpdateIgnore,
-                         Delete, DeleteIgnore, Predicate,
-                         TimeDelta, KuduError, ScanTokenBuilder,
+                         Delete, DeleteIgnore, Upsert, UpsertIgnore,
+                         Predicate, TimeDelta, KuduError, ScanTokenBuilder,
                          ScanToken,
                          LEADER_ONLY,
                          CLOSEST_REPLICA,
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index d76bdf9e1..861346b22 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -1052,6 +1052,24 @@ cdef class Table:
         """
         return Upsert(self, record)
 
+    def new_upsert_ignore(self, record=None):
+        """
+        Create a new UpsertIgnore operation. Pass the completed UpsertIgnore 
to a Session.
+        If a record is provided, a PartialRow will be initialized with values
+        from the input record. The record can be in the form of a tuple, dict,
+        or list. Dictionary keys can be either column names, indexes, or a
+        mix of both names and indexes.
+
+        Parameters
+        ----------
+        record : tuple/list/dict
+
+        Returns
+        -------
+        upsertIgnore : UpsertIgnore
+        """
+        return UpsertIgnore(self, record)
+
     def new_update(self, record=None):
         """
         Create a new Update operation. Pass the completed Update to a Session.
@@ -3183,6 +3201,17 @@ cdef class Upsert(WriteOperation):
         del self.op
 
 
+cdef class UpsertIgnore(WriteOperation):
+    def __cinit__(self, Table table, record=None):
+        self.op = table.ptr().NewUpsertIgnore()
+        self.py_row.row = self.op.mutable_row()
+        if record:
+            self.py_row.from_record(record)
+
+    def __dealloc__(self):
+        del self.op
+
+
 cdef class Update(WriteOperation):
     def __cinit__(self, Table table, record=None):
         self.op = table.ptr().NewUpdate()
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 5ff8e3229..4fd96ded3 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -464,6 +464,7 @@ cdef extern from "kudu/client/write_op.h" namespace 
"kudu::client" nogil:
         INSERT_IGNORE " kudu::client::KuduWriteOperation::INSERT_IGNORE"
         UPDATE_IGNORE " kudu::client::KuduWriteOperation::UPDATE_IGNORE"
         DELETE_IGNORE " kudu::client::KuduWriteOperation::DELETE_IGNORE"
+        UPSERT_IGNORE " kudu::client::KuduWriteOperation::UPSERT_IGNORE"
 
     cdef cppclass KuduWriteOperation:
         KuduPartialRow& row()
@@ -485,6 +486,9 @@ cdef extern from "kudu/client/write_op.h" namespace 
"kudu::client" nogil:
     cdef cppclass KuduUpsert(KuduWriteOperation):
         pass
 
+    cdef cppclass KuduUpsertIgnore(KuduWriteOperation):
+        pass
+
     cdef cppclass KuduDelete(KuduWriteOperation):
         pass
 
@@ -684,6 +688,7 @@ cdef extern from "kudu/client/client.h" namespace 
"kudu::client" nogil:
         KuduInsert* NewInsert()
         KuduInsertIgnore* NewInsertIgnore()
         KuduUpsert* NewUpsert()
+        KuduUpsertIgnore* NewUpsertIgnore()
         KuduUpdate* NewUpdate()
         KuduUpdateIgnore* NewUpdateIgnore()
         KuduDelete* NewDelete()
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 5f910df2c..6a3f101b1 100755
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -378,22 +378,32 @@ class TestClient(KuduTestBase, CompatUnitTest):
             session = self.client.new_session()
             op = table.new_insert()
             op['key'] = 1
-            op['immutable_data'] = 'text'
-            op['mutable_data'] = 'text'
+            op['immutable_data'] = 'immutable_text'
+            op['mutable_data'] = 'mutable_text'
             session.apply(op)
             session.flush()
+            # successful_inserts++
+            self.doVerifyMetrics(session, 1, 0, 0, 0, 0, 0, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 1 == len(results)
+            assert (1, 'immutable_text', 'mutable_text') in results
 
             # Update the mutable columns
             op = table.new_update()
             op['key'] = 1
-            op['mutable_data'] = 'new_text'
+            op['mutable_data'] = 'new_mutable_text_update'
             session.apply(op)
             session.flush()
+            # successful_updates++
+            self.doVerifyMetrics(session, 1, 0, 0, 0, 1, 0, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 1 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_update') in results
 
             # Update the immutable column
             op = table.new_update()
             op['key'] = 1
-            op['immutable_data'] = 'new_text'
+            op['immutable_data'] = 'new_immutable_text_update'
             session.apply(op)
             try:
                 session.flush()
@@ -403,19 +413,26 @@ class TestClient(KuduTestBase, CompatUnitTest):
                 assert not overflow
                 assert len(errors) == 1
                 assert message in repr(errors[0])
+            # nothing changed
+            self.doVerifyMetrics(session, 1, 0, 0, 0, 1, 0, 0, 0)
 
             # Update ignore on both mutable and immutable columns. The error 
is ignored.
             op = table.new_update_ignore()
             op['key'] = 1
-            op['immutable_data'] = 'new_text'
-            op['mutable_data'] = 'new_text'
+            op['immutable_data'] = 'new_immutable_text_update_ignore'
+            op['mutable_data'] = 'new_mutable_text_update_ignore'
             session.apply(op)
             session.flush()
+            # successful_updates++, update_ignore_errors++
+            self.doVerifyMetrics(session, 1, 0, 0, 0, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 1 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_update_ignore') in 
results
 
-            # Update ignore the immutable column
+            # Update ignore only the immutable column results in 'Invalid 
argument' error
             op = table.new_update_ignore()
             op['key'] = 1
-            op['immutable_data'] = 'new_text'
+            op['immutable_data'] = 'new_immutable_text_update_ignore'
             session.apply(op)
             try:
                 session.flush()
@@ -425,20 +442,28 @@ class TestClient(KuduTestBase, CompatUnitTest):
                 assert not overflow
                 assert len(errors) == 1
                 assert message in repr(errors[0])
-
-            # TODO: test upsert ignore, once it is supported by the Python 
client.
+            # nothing changed
+            self.doVerifyMetrics(session, 1, 0, 0, 0, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 1 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_update_ignore') in 
results
 
             # Upsert the mutable columns
             op = table.new_upsert()
             op['key'] = 1
-            op['mutable_data'] = 'new_text'
+            op['mutable_data'] = 'new_mutable_text_upsert'
             session.apply(op)
             session.flush()
+            # successful_upserts++
+            self.doVerifyMetrics(session, 1, 0, 1, 0, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 1 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_upsert') in results
 
             # Upsert the immutable column
             op = table.new_upsert()
             op['key'] = 1
-            op['immutable_data'] = 'new_text'
+            op['immutable_data'] = 'new_immutable_text_upsert'
             session.apply(op)
             try:
                 session.flush()
@@ -448,6 +473,67 @@ class TestClient(KuduTestBase, CompatUnitTest):
                 assert not overflow
                 assert len(errors) == 1
                 assert message in repr(errors[0])
+            # nothing changed
+            self.doVerifyMetrics(session, 1, 0, 1, 0, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 1 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_upsert') in results
+
+            # Upsert ignore can insert a row without immutable column set
+            op = table.new_upsert_ignore()
+            op['key'] = 2
+            op['mutable_data'] = 'mutable_text'
+            session.apply(op)
+            session.flush()
+            # successful_upserts++
+            self.doVerifyMetrics(session, 1, 0, 2, 0, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 2 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_upsert') in results
+            assert (2, None, 'mutable_text') in results
+
+            # Upsert ignore can insert a new row with immutable column set
+            op = table.new_upsert_ignore()
+            op['key'] = 3
+            op['immutable_data'] = 'immutable_text'
+            session.apply(op)
+            session.flush()
+            # successful_upserts++
+            self.doVerifyMetrics(session, 1, 0, 3, 0, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 3 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_upsert') in results
+            assert (2, None, 'mutable_text') in results
+            assert (3, 'immutable_text', None) in results
+
+            # Upsert ignore can update existing row without immutable column 
set
+            op = table.new_upsert_ignore()
+            op['key'] = 1
+            op['mutable_data'] = 'new_mutable_text_upsert_ignore'
+            session.apply(op)
+            session.flush()
+            # successful_upserts++
+            self.doVerifyMetrics(session, 1, 0, 4, 0, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 3 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_upsert_ignore') in 
results
+            assert (2, None, 'mutable_text') in results
+            assert (3, 'immutable_text', None) in results
+
+            # Try to upsert ignore existing row with immutable column set
+            op = table.new_upsert_ignore()
+            op['key'] = 1
+            op['immutable_data'] = 'new_immutable_text_upsert_ignore'
+            op['mutable_data'] = 'new_mutable_text_upsert_ignore_2'
+            session.apply(op)
+            session.flush()
+            # successful_upsers++, upsert_ignore_errors++
+            self.doVerifyMetrics(session, 1, 0, 5, 1, 2, 1, 0, 0)
+            results = table.scanner().open().read_all_tuples()
+            assert 3 == len(results)
+            assert (1, 'immutable_text', 'new_mutable_text_upsert_ignore_2') 
in results
+            assert (2, None, 'mutable_text') in results
+            assert (3, 'immutable_text', None) in results
 
         finally:
             try:
@@ -494,8 +580,11 @@ class TestClient(KuduTestBase, CompatUnitTest):
             op[Schema.get_auto_incrementing_column_name()] = 1
             session.apply(op)
 
-            # TODO: once upsert_ignore is supported by the Python client,
-            # check if specifying all the key columns works.
+            # Upsert ignore with auto-incrementing column specified
+            op = table.new_upsert_ignore()
+            op['key'] = 1
+            op[Schema.get_auto_incrementing_column_name()] = 1
+            session.apply(op)
 
             # With non-unique primary key, one can't use the tuple/list 
initialization for new
             # inserts. In this case, at the second position it would like to 
get an int64 (the type

Reply via email to