This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit c02ad2fe699c7607d6fbe52d2c7e73ca2313d36e Author: Marton Greber <[email protected]> AuthorDate: Sun Oct 1 11:05:51 2023 +0000 [Python] KUDU-3351 Add write op metrics This is a follow-up patch for commit: 0ddcaaabc97c85a4715ae79ff5604feb9b342779, adding per-session write op metrics to the Python client. In the test function "test_insert_and_mutate_rows" I added verification function calls, to check that the metrics are gathered properly. Only "upsert_ignore_errors" is left out, as UPSERT IGNORE is not yet supported by the Python client. I'm planning to address that in the next patch. Change-Id: Id76f8f0cb11ef5e4b9d06508a39492bc6b0109a9 Reviewed-on: http://gerrit.cloudera.org:8080/20526 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- python/kudu/client.pyx | 16 ++++++++++++++++ python/kudu/libkudu_client.pxd | 1 + python/kudu/tests/common.py | 20 ++++++++++++++++++++ python/kudu/tests/test_client.py | 3 +++ 4 files changed, 40 insertions(+) diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index fdebff0c5..d76bdf9e1 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -1734,6 +1734,22 @@ cdef class Session: return result, overflowed + def get_write_op_metrics(self): + """ + Return the cumulative write operation metrics since the beginning of the session. + + Returns + ------- + metrics : Dictionary + """ + _map = self.s.get().GetWriteOpMetrics().Get() + + # Convert map to python dictionary + result = {} + for it in _map: + result[frombytes(it.first)] = it.second + return result + cdef class Row: diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index 4757153b9..5ff8e3229 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -750,6 +750,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: int CountPendingErrors() void GetPendingErrors(vector[C_KuduError*]* errors, c_bool* overflowed) + ResourceMetrics& GetWriteOpMetrics() KuduClient* client() diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py index c86aac4a8..f0646d1ea 100644 --- a/python/kudu/tests/common.py +++ b/python/kudu/tests/common.py @@ -172,3 +172,23 @@ class KuduTestBase(object): "subject" : "test", "is_valid_key" : is_valid_key}}) return resp['createJwt']['jwt'] + + @classmethod + def doVerifyMetrics(cls, session, + successful_inserts, + insert_ignore_errors, + successful_upserts, + upsert_ignore_errors, + successful_updates, + update_ignore_errors, + successful_deletes, + delete_ignore_errors,): + metrics = session.get_write_op_metrics() + assert successful_inserts == metrics["successful_inserts"] + assert insert_ignore_errors == metrics["insert_ignore_errors"] + assert successful_upserts == metrics["successful_upserts"] + assert upsert_ignore_errors == metrics["upsert_ignore_errors"] + assert successful_updates == metrics["successful_updates"] + assert update_ignore_errors == metrics["update_ignore_errors"] + assert successful_deletes == metrics["successful_deletes"] + assert delete_ignore_errors == metrics["delete_ignore_errors"] diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index 020dec96a..5f910df2c 100755 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -306,6 +306,7 @@ class TestClient(KuduTestBase, CompatUnitTest): # synchronous session.flush() + self.doVerifyMetrics(session, 100, 0, 0, 0, 0, 0, 0, 0) # Update a row, upsert another one op = table.new_update() @@ -322,6 +323,7 @@ class TestClient(KuduTestBase, CompatUnitTest): 2: 'upserted'}) session.apply(op) session.flush() + self.doVerifyMetrics(session, 100, 0, 1, 0, 1, 0, 0, 0) # Insert ignore existing row op = table.new_insert_ignore((3, 1, 'hello_1')) @@ -352,6 +354,7 @@ class TestClient(KuduTestBase, CompatUnitTest): op = table.new_delete_ignore({'key': i}) session.apply(op) session.flush() + self.doVerifyMetrics(session, 100, 1, 1, 0, 1, 1, 100, 1) scanner = table.scanner().open() assert len(scanner.read_all_tuples()) == 0
