Repository: kudu Updated Branches: refs/heads/master bce1dd777 -> cae2f9899
KUDU-1671 - [python] Enable predicate pushdown for additional types Currently, the python client does not support predicate pushdown for boolean and unixtime_micros values. Additionally, as pointed out in KUDU-1672, float predicates have a bug. This patch addresses both of these issues. Test cases have been added to validate this functionality. Two minor namespace issues were addressed as well for float and boolean types. Change-Id: If5766d24055dfba5fa371fc61c6dfd66adc54273 Reviewed-on: http://gerrit.cloudera.org:8080/4589 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d1b1e97 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d1b1e97 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d1b1e97 Branch: refs/heads/master Commit: 4d1b1e973655b783f2e7b338b6574b2e2166e08f Parents: bce1dd7 Author: Jordan Birdsell <[email protected]> Authored: Sun Oct 2 16:58:17 2016 -0400 Committer: David Ribeiro Alves <[email protected]> Committed: Thu Oct 6 00:55:43 2016 +0000 ---------------------------------------------------------------------- python/kudu/__init__.py | 4 +- python/kudu/client.pyx | 40 +++++++-------- python/kudu/tests/test_scanner.py | 37 +++++++++++++- python/kudu/tests/test_scantoken.py | 40 +++++++++++++-- python/kudu/tests/util.py | 84 +++++++++++++++++++++++++++++++- python/kudu/util.py | 5 ++ 6 files changed, 182 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/__init__.py ---------------------------------------------------------------------- diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py index 99ca0d9..771f99a 100644 --- a/python/kudu/__init__.py +++ b/python/kudu/__init__.py @@ -30,8 +30,8 @@ from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound, # noqa KuduInvalidArgument) from kudu.schema import (int8, int16, int32, int64, string_ as string, # noqa - double_ as double, float_, binary, - unixtime_micros, + double_ as double, float_, float_ as float, binary, + unixtime_micros, bool_ as bool, KuduType, SchemaBuilder, ColumnSpec, Schema, ColumnSchema, COMPRESSION_DEFAULT, http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index 150997d..261fdbf 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -766,21 +766,28 @@ cdef class Column: cdef KuduValue* box_value(self, object obj) except NULL: cdef: KuduValue* val - Slice* slc + Slice slc - if isinstance(obj, unicode): - obj = obj.encode('utf8') - - if isinstance(obj, bytes): - slc = new Slice(<char*> obj, len(obj)) - val = KuduValue.CopyString(deref(slc)) - del slc - elif isinstance(obj, int): + if (self.spec.type.name[:3] == 'int'): val = KuduValue.FromInt(obj) - elif isinstance(obj, float): + elif (self.spec.type.name == 'string'): + if isinstance(obj, unicode): + obj = obj.encode('utf8') + + slc = Slice(<char*> obj, len(obj)) + val = KuduValue.CopyString(slc) + elif (self.spec.type.name == 'bool'): + val = KuduValue.FromBool(obj) + elif (self.spec.type.name == 'float'): + val = KuduValue.FromFloat(obj) + elif (self.spec.type.name == 'double'): val = KuduValue.FromDouble(obj) + elif (self.spec.type.name == 'unixtime_micros'): + obj = to_unixtime_micros(obj) + val = KuduValue.FromInt(obj) else: - raise TypeError(obj) + raise TypeError("Cannot add predicate for kudu type <{0}>" + .format(self.spec.type.name)) return val @@ -2006,15 +2013,8 @@ cdef class PartialRow: self.row.SetStringCopy(i, deref(slc)) del slc elif t == KUDU_UNIXTIME_MICROS: - # String with custom format - # eg: ("2016-01-01", "%Y-%m-%d") - if type(value) is tuple: - self.row.SetUnixTimeMicros(i, <int64_t> - to_unixtime_micros(value[0], value[1])) - # datetime.datetime input or string with default format - else: - self.row.SetUnixTimeMicros(i, <int64_t> - to_unixtime_micros(value)) + self.row.SetUnixTimeMicros(i, <int64_t> + to_unixtime_micros(value)) else: raise TypeError("Cannot set kudu type <{0}>.".format(_type_names[t])) http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/tests/test_scanner.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py index 950f0dd..72a22a4 100644 --- a/python/kudu/tests/test_scanner.py +++ b/python/kudu/tests/test_scanner.py @@ -141,14 +141,14 @@ class TestScanner(TestScanBase): with self.assertRaises(TypeError): scanner.add_predicates([sv >= None]) - with self.assertRaises(kudu.KuduInvalidArgument): + with self.assertRaises(TypeError): scanner.add_predicates([sv >= 1]) with self.assertRaises(TypeError): scanner.add_predicates([sv.in_list(['testing', datetime.datetime.utcnow()])]) - with self.assertRaises(kudu.KuduInvalidArgument): + with self.assertRaises(TypeError): scanner.add_predicates([sv.in_list([ 'hello_20', 120 @@ -214,3 +214,36 @@ class TestScanner(TestScanBase): check_tuples = sorted(scanner.read_all_tuples()) # Avoid tight looping time.sleep(0.05) + + def verify_pred_type_scans(self, preds, row_indexes, count_only=False): + # Using the incoming list of predicates, verify that the row returned + # matches the inserted tuple at the row indexes specified in a + # slice object + scanner = self.type_table.scanner() + scanner.set_fault_tolerant() + scanner.add_predicates(preds) + scanner.set_projected_column_names(self.projected_names_w_o_float) + tuples = scanner.open().read_all_tuples() + + # verify rows + if count_only: + self.assertEqual(len(self.type_test_rows[row_indexes]), len(tuples)) + else: + self.assertEqual(sorted(self.type_test_rows[row_indexes]), tuples) + + def test_unixtime_micros_pred(self): + # Test unixtime_micros value predicate + self._test_unixtime_micros_pred() + + def test_bool_pred(self): + # Test a boolean value predicate + self._test_bool_pred() + + def test_double_pred(self): + # Test a double precision float predicate + self._test_double_pred() + + def test_float_pred(self): + # Test a single precision float predicate + # Does a row check count only + self._test_float_pred() http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/tests/test_scantoken.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py index 5a37486..da879d5 100644 --- a/python/kudu/tests/test_scantoken.py +++ b/python/kudu/tests/test_scantoken.py @@ -38,7 +38,7 @@ class TestScanToken(TestScanBase): def setUp(self): pass - def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples): + def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples, count_only=False): """ Given the input serialized tokens, spawn new threads, execute them and validate the results @@ -55,7 +55,10 @@ class TestScanToken(TestScanBase): for result in results: actual_tuples += result - self.assertEqual(sorted(expected_tuples), sorted(actual_tuples)) + if count_only: + self.assertEqual(expected_tuples, actual_tuples) + else: + self.assertEqual(sorted(expected_tuples), sorted(actual_tuples)) def test_scan_token_serde_threaded_with_named_projection(self): """ @@ -113,7 +116,7 @@ class TestScanToken(TestScanBase): with self.assertRaises(TypeError): builder.add_predicates([sv >= None]) - with self.assertRaises(kudu.KuduInvalidArgument): + with self.assertRaises(TypeError): builder.add_predicates([sv >= 1]) def test_scan_token_batch_by_batch_with_local_scanner(self): @@ -209,3 +212,34 @@ class TestScanToken(TestScanBase): tuples.extend(scanner.read_all_tuples()) self.assertEqual(sorted(self.tuples), sorted(tuples)) + + def verify_pred_type_scans(self, preds, row_indexes, count_only=False): + # Using the incoming list of predicates, verify that the row returned + # matches the inserted tuple at the row indexes specified in a + # slice object + builder = self.type_table.scan_token_builder() + builder.set_fault_tolerant() + builder.set_projected_column_names(self.projected_names_w_o_float) + builder.add_predicates(preds) + + # Verify rows + self._subtest_serialize_thread_and_verify(builder.build(), + self.type_test_rows[row_indexes], + count_only) + + def test_unixtime_micros_pred(self): + # Test unixtime_micros value predicate + self._test_unixtime_micros_pred() + + def test_bool_pred(self): + # Test a boolean value predicate + self._test_bool_pred() + + def test_double_pred(self): + # Test a double precision float predicate + self._test_double_pred() + + def test_float_pred(self): + # Test a single precision float predicate + # Does a row check count only + self._test_float_pred() http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/tests/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py index 6a72a63..9f64e39 100644 --- a/python/kudu/tests/util.py +++ b/python/kudu/tests/util.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -17,6 +18,7 @@ # under the License. from kudu.compat import unittest +from kudu.client import Partitioning from kudu.tests.common import KuduTestBase import kudu import datetime @@ -58,6 +60,50 @@ class TestScanBase(KuduTestBase, unittest.TestCase): self.table = table self.tuples = tuples + # Create table to test all types + # for various predicate tests + table_name = 'type-test' + # Create schema, partitioning and then table + builder = kudu.schema_builder() + builder.add_column('key').type(kudu.int64).nullable(False).primary_key() + builder.add_column('unixtime_micros_val', type_=kudu.unixtime_micros, nullable=False) + builder.add_column('string_val', type_=kudu.string, compression=kudu.COMPRESSION_LZ4, encoding='prefix') + builder.add_column('bool_val', type_=kudu.bool) + builder.add_column('double_val', type_=kudu.double) + builder.add_column('int8_val', type_=kudu.int8) + builder.add_column('float_val', type_=kudu.float) + schema = builder.build() + + self.projected_names_w_o_float = [ + col for col in schema.names if col != 'float_val' + ] + + partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3) + + self.client.create_table(table_name, schema, partitioning) + self.type_table = self.client.table(table_name) + + # Insert new rows + self.type_test_rows = [ + (1, datetime.datetime(2016, 1, 1).replace(tzinfo=pytz.utc), + "Test One", True, 1.7976931348623157 * (10^308), 127, 3.402823 * (10^38)), + (2, datetime.datetime.utcnow().replace(tzinfo=pytz.utc), + "æµè¯äº", False, 200.1, -1, -150.2) + ] + session = self.client.new_session() + for row in self.type_test_rows: + op = self.type_table.new_insert() + for idx, val in enumerate(row): + op[idx] = val + session.apply(op) + session.flush() + + # Remove the float values from the type_test_rows tuples so we can + # compare the other vals + self.type_test_rows = [ + tuple[:-1] for tuple in self.type_test_rows + ] + def setUp(self): pass @@ -127,4 +173,40 @@ class TestScanBase(KuduTestBase, unittest.TestCase): for idx, val in enumerate(row): op[idx] = val session.apply(op) - session.flush() \ No newline at end of file + session.flush() + + def _test_unixtime_micros_pred(self): + # Test unixtime_micros value predicate + self.verify_pred_type_scans( + preds=[ + self.type_table['unixtime_micros_val'] == ("2016-01-01", "%Y-%m-%d") + ], + row_indexes=slice(0,1) + ) + + def _test_bool_pred(self): + # Test a boolean value predicate + self.verify_pred_type_scans( + preds=[ + self.type_table['bool_val'] == False + ], + row_indexes=slice(1,2) + ) + + def _test_double_pred(self): + # Test a double precision float predicate + self.verify_pred_type_scans( + preds=[ + self.type_table['double_val'] < 200.11 + ], + row_indexes=slice(1,2) + ) + + def _test_float_pred(self): + self.verify_pred_type_scans( + preds=[ + self.type_table['float_val'] == 3.402823 * (10^38) + ], + row_indexes=slice(0, 1), + count_only=True + ) http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/util.py b/python/kudu/util.py index 8533b04..f290425 100644 --- a/python/kudu/util.py +++ b/python/kudu/util.py @@ -46,6 +46,9 @@ def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"): --------- timestamp : datetime.datetime or string If a string is provided, a format must be provided as well. + A tuple may be provided in place of the timestamp with a + string value and a format. This is useful for predicates + and setting values where this method is indirectly called. Timezones provided in the string are not supported at this time. UTC unless provided in a datetime object. format : Required if a string timestamp is provided @@ -60,6 +63,8 @@ def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"): pass elif isinstance(timestamp, six.string_types): timestamp = datetime.datetime.strptime(timestamp, format) + elif isinstance(timestamp, tuple): + timestamp = datetime.datetime.strptime(timestamp[0], timestamp[1]) else: raise ValueError("Invalid timestamp type. " + "You must provide a datetime.datetime or a string.")
