This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 6d48cd6f90e7820fbabd0560139ad48fe9d66a29 Author: Marton Greber <[email protected]> AuthorDate: Wed May 10 13:56:02 2023 +0000 KUDU-1945 Add Python example for non-unique PK In commit 3019848d00ac721b30d98a1aeb77bc205352b7b5 a C++ client example has been added to showcase the usage of the non-unique PK. This patch translates this particular example to Python. The C++ examples are tested during upstream submission, however the Java and Python client examples are not tested automatically. Therefore, I did manual testing and created a ticket to track this improvement: KUDU-3478. Manual testing has been done using the following configurations: Ubuntu 18.04 x86_64: Python2.7 Ubuntu 18.04 x86_64: Python3.7 macOS Monterey M1(arm64): Python3.7 (I couldn't setup the Python2.7 interpreter properly on my mac) STDOUT of the example: kudu.Schema { non_unique_key int32 NOT NULL auto_incrementing_id int64 NOT NULL int_val int32 NOT NULL PRIMARY KEY (non_unique_key, auto_incrementing_id) } Demonstrating scanning ... (non_unique_key: 3, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 3, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 3, auto_incrementing_id: 3, int_val: 2) (non_unique_key: 4, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 2) (non_unique_key: 2, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 2, auto_incrementing_id: 5, int_val: 1) (non_unique_key: 2, auto_incrementing_id: 6, int_val: 2) Scanned some row(s) WHERE non_unique_key = 1 (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 2) Demonstrating UPDATE ... Updated row(s) WHERE non_unique_key = 1 AND int_val = 2 to int_val = 98 (non_unique_key: 3, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 3, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 3, auto_incrementing_id: 3, int_val: 2) (non_unique_key: 4, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 98) (non_unique_key: 2, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 2, auto_incrementing_id: 5, int_val: 1) (non_unique_key: 2, auto_incrementing_id: 6, int_val: 2) Updated row(s) WHERE non_unique_key = 2 to int_val = 99 (non_unique_key: 3, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 3, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 3, auto_incrementing_id: 3, int_val: 2) (non_unique_key: 4, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 98) (non_unique_key: 2, auto_incrementing_id: 4, int_val: 99) (non_unique_key: 2, auto_incrementing_id: 5, int_val: 99) (non_unique_key: 2, auto_incrementing_id: 6, int_val: 99) Updated row(s) WHERE non_unique_key = 2 AND auto_incrementing_id = 5\ to int_val = 100 (non_unique_key: 3, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 3, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 3, auto_incrementing_id: 3, int_val: 2) (non_unique_key: 4, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 98) (non_unique_key: 2, auto_incrementing_id: 4, int_val: 99) (non_unique_key: 2, auto_incrementing_id: 5, int_val: 100) (non_unique_key: 2, auto_incrementing_id: 6, int_val: 99) Demonstrating DELETE ... Deleted row(s) WHERE non_unique_key = 3 AND int_val = 1 (non_unique_key: 3, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 3, auto_incrementing_id: 3, int_val: 2) (non_unique_key: 4, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 98) (non_unique_key: 2, auto_incrementing_id: 4, int_val: 99) (non_unique_key: 2, auto_incrementing_id: 5, int_val: 100) (non_unique_key: 2, auto_incrementing_id: 6, int_val: 99) Deleted row(s) WHERE non_unique_key = 2 (non_unique_key: 3, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 3, auto_incrementing_id: 3, int_val: 2) (non_unique_key: 4, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 98) Deleted row(s) WHERE non_unique_key = 3 AND auto_incrementing_id = 3 (non_unique_key: 3, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 4, auto_incrementing_id: 4, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 1, int_val: 0) (non_unique_key: 1, auto_incrementing_id: 2, int_val: 1) (non_unique_key: 1, auto_incrementing_id: 3, int_val: 98) Deleted the table Done Change-Id: I1dd862c4f26b3d79ec268727363fae3426135f52 Reviewed-on: http://gerrit.cloudera.org:8080/19874 Tested-by: Kudu Jenkins Reviewed-by: Wenzhe Zhou <[email protected]> Reviewed-by: Attila Bukor <[email protected]> --- .../basic-python-example/non_unique_primary_key.py | 221 +++++++++++++++++++++ 1 file changed, 221 insertions(+) diff --git a/examples/python/basic-python-example/non_unique_primary_key.py b/examples/python/basic-python-example/non_unique_primary_key.py new file mode 100644 index 000000000..7adaf4391 --- /dev/null +++ b/examples/python/basic-python-example/non_unique_primary_key.py @@ -0,0 +1,221 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse + +import kudu +from kudu import Schema +from kudu.client import Partitioning + +# Pretty print scan results according to the schema +def print_tuple(t): + print('(non_unique_key: {0}, {1}: {2}, int_val: {3})'.format(t[0],\ + Schema.get_auto_incrementing_column_name(), t[1], t[2])) + +def print_rows(scanner): + scanner = scanner.open() + # The rows contains the auto-incrementing column. If one doesn't requre it, it can be + # discarded through a projection. + while scanner.has_more_rows(): + for t in scanner.read_next_batch_tuples(): + print_tuple(t) + +def update_rows(client, table, scanner, new_val): + # It's necessary to specify the entire set of key columns when updating a particular row. + # An auto-incrementing column is auto-populated at the server side, and one way to retrieve + # its values is scanning the table with a projection that includes the auto-incrementing column. + session = client.new_session() + while scanner.has_more_rows(): + for t in scanner.read_next_batch_tuples(): + op = table.new_update() + op['non_unique_key'] = t[0] + op[Schema.get_auto_incrementing_column_name()] = t[1] + op['int_val'] = new_val + session.apply(op) + + try: + session.flush() + except kudu.KuduBadStatus: + print(session.get_pending_errors()) + +def delete_rows(client, table, scanner): + # It's necessary to specify the entire set of key columns when updating a particular row. + # An auto-incrementing column is auto-populated at the server side, and one way to retrieve + # its values is scanning the table with a projection that includes the auto-incrementing column. + session = client.new_session() + while scanner.has_more_rows(): + for t in scanner.read_next_batch_tuples(): + op = table.new_delete() + op['non_unique_key'] = t[0] + op[Schema.get_auto_incrementing_column_name()] = t[1] + session.apply(op) + + try: + session.flush() + except kudu.KuduBadStatus: + print(session.get_pending_errors()) + +# Parse arguments +parser = argparse.ArgumentParser(description='Basic Example for Kudu Python.') +parser.add_argument('--masters', '-m', nargs='+', default='localhost', + help='The master address(es) to connect to Kudu.') +parser.add_argument('--ports', '-p', nargs='+', default='7051', + help='The master server port(s) to connect to Kudu.') +args = parser.parse_args() + +client = kudu.connect(host=args.masters, port=args.ports) + +builder = kudu.schema_builder() +# Columns which are not uniquely identifiable can still be used as primary keys by +# specifying them as non-unique primary key. +builder.add_column('non_unique_key').type(kudu.int32).nullable(False).non_unique_primary_key() +builder.add_column('int_val', type_=kudu.int32, nullable=False) +schema = builder.build() + +# The schema stringification shows the presence of the auto-incrementing column, +# and the resulting composite primary key. +print(schema) + +partitioning = Partitioning().add_hash_partitions(column_names=['non_unique_key'], num_buckets=2) + +table_name = 'non_unique_key-example' +if client.table_exists(table_name): + client.delete_table(table_name) +client.create_table(table_name, schema, partitioning) + +table = client.table(table_name) +session = client.new_session() +stale_counter = 0 +num_rows = 10 +divisor = 3 +for i in range(num_rows): + if i % divisor == 0: + stale_counter += 1 + + op = table.new_insert() + # The auto-incrementing column is populated on the server-side automatically. + op['non_unique_key'] = stale_counter + op['int_val'] = i % divisor + session.apply(op) + +try: + session.flush() +except kudu.KuduBadStatus: + print(session.get_pending_errors()) + +print('Demonstrating scanning ...') +scanner = table.scanner() +print_rows(scanner) + +non_unique_key_equals = 1 +print('Scanned some row(s) WHERE non_unique_key = {0}'\ + .format(non_unique_key_equals)) +scanner = table.scanner() +scanner.add_predicate(table['non_unique_key'] == non_unique_key_equals).open() +print_rows(scanner) + +print('Demonstrating UPDATE ...') +# Updating based upon a predicate on a non-unique PK and on a non-PK column +non_unique_key_equals = 1 +int_val_equals = 2 +new_val = 98 +scanner = table.scanner() +scanner.add_predicate(table['non_unique_key'] == non_unique_key_equals)\ + .add_predicate(table['int_val'] == int_val_equals)\ + .open() +update_rows(client, table, scanner, new_val) + +print('Updated row(s) WHERE non_unique_key = {0} AND int_val = {1} to int_val = {2}'\ + .format(non_unique_key_equals, int_val_equals, new_val)) +scanner = table.scanner() +print_rows(scanner) + +# Updating based upon a predicate on a non-unique PK +non_unique_key_equals = 2 +new_val = 99 +scanner = table.scanner() +scanner.add_predicate(table['non_unique_key'] == non_unique_key_equals).open() +update_rows(client, table, scanner, new_val) + +print('Updated row(s) WHERE non_unique_key = {0} to int_val = {1}'\ + .format(non_unique_key_equals, new_val)) +scanner = table.scanner() +print_rows(scanner) + +# Updating based upon a predicate on a non-unique PK and on the auto-incrementing column +non_unique_key_equals = 2 +auto_incrementing_counter_val = 5 +new_val = 100 +scanner = table.scanner() +scanner.add_predicate(table['non_unique_key'] == non_unique_key_equals)\ + .add_predicate(table[Schema.get_auto_incrementing_column_name()] ==\ + auto_incrementing_counter_val)\ + .open() +update_rows(client, table, scanner, new_val) + +print('Updated row(s) WHERE non_unique_key = {0} AND {1} = {2} to int_val = {3}'\ + .format(non_unique_key_equals, Schema.get_auto_incrementing_column_name(),\ + auto_incrementing_counter_val, new_val)) +scanner = table.scanner() +print_rows(scanner) + +print('Demonstrating DELETE ...') +# Deleting based upon a predicate on a non-unique PK and on a non-PK column +non_unique_key_equals = 3 +int_val_equals = 1 +scanner = table.scanner() +scanner.add_predicate(table['non_unique_key'] == non_unique_key_equals)\ + .add_predicate(table['int_val'] == int_val_equals)\ + .open() +delete_rows(client, table, scanner) + +print('Deleted row(s) WHERE non_unique_key = {0} AND int_val = {1}'\ + .format(non_unique_key_equals, int_val_equals)) +scanner = table.scanner() +print_rows(scanner) + +# Deleting based upon a predicate on a non-unique PK +non_unique_key_equals = 2 +scanner = table.scanner() +scanner.add_predicate(table['non_unique_key'] == non_unique_key_equals)\ + .open() +delete_rows(client, table, scanner) + +print('Deleted row(s) WHERE non_unique_key = {0}'\ + .format(non_unique_key_equals, int_val_equals)) +scanner = table.scanner() +print_rows(scanner) + +# Deleting based upon a predicate on a non-unique PK and on the auto-incrementing column +non_unique_key_equals = 3 +auto_incrementing_counter_val = 3 +scanner = table.scanner() +scanner.add_predicate(table['non_unique_key'] == non_unique_key_equals)\ + .add_predicate(table[Schema.get_auto_incrementing_column_name()] ==\ + auto_incrementing_counter_val)\ + .open() +delete_rows(client, table, scanner) + +print('Deleted row(s) WHERE non_unique_key = {0} AND {1} = {2}'\ + .format(non_unique_key_equals, Schema.get_auto_incrementing_column_name(),\ + auto_incrementing_counter_val)) +scanner = table.scanner() +print_rows(scanner) + +client.delete_table(table_name) +print('Deleted the table') +print('Done') \ No newline at end of file
