Repository: kudu Updated Branches: refs/heads/master 1a062253e -> 74065d1ca
[python] Implement Scan Token API First attempt at implementing the [Scan Token API](http://gerrit.cloudera.org:8080/#/c/2443/) for the Python client. This patch should also resolve KUDU-1401. I included several unit tests, most of which were based on the current scanner unit tests. Change-Id: I710c93e51ab5f0f5ed038aaaf1925b58c576b655 Reviewed-on: http://gerrit.cloudera.org:8080/4367 Tested-by: Kudu Jenkins Reviewed-by: Dan Burkert <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/609546db Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/609546db Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/609546db Branch: refs/heads/master Commit: 609546dbc2f0f7ed9daeb0eaff2bbc07042b0ed8 Parents: 1a06225 Author: Jordan Birdsell <[email protected]> Authored: Sat Sep 10 18:06:08 2016 -0400 Committer: Dan Burkert <[email protected]> Committed: Tue Sep 13 16:58:23 2016 +0000 ---------------------------------------------------------------------- python/kudu/__init__.py | 3 +- python/kudu/client.pyx | 396 ++++++++++++++++++++++++++++++- python/kudu/libkudu_client.pxd | 49 +++- python/kudu/tests/test_scantoken.py | 162 +++++++++++++ python/setup.py | 2 +- 5 files changed, 604 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/__init__.py ---------------------------------------------------------------------- diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py index 155f2e5..1a1ff39 100644 --- a/python/kudu/__init__.py +++ b/python/kudu/__init__.py @@ -17,7 +17,8 @@ from kudu.client import (Client, Table, Scanner, Session, # noqa Insert, Update, Delete, Predicate, - TimeDelta, KuduError, + TimeDelta, KuduError, ScanTokenBuilder, + ScanToken, FLUSH_AUTO_BACKGROUND, FLUSH_AUTO_SYNC, FLUSH_MANUAL) http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index f111da6..5394410 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -300,6 +300,22 @@ cdef class Client: check_status(self.cp.TableExists(c_name, &exists)) return exists + def deserialize_token_into_scanner(self, serialized_token): + """ + Deserializes a ScanToken using the client and returns a scanner. + + Parameters + ---------- + serialized_token : String + Serialized form of a ScanToken. + + Returns + ------- + scanner : Scanner + """ + token = ScanToken() + return token.deserialize_into_scanner(self, serialized_token) + def table(self, table_name): """ Construct a kudu.Table and retrieve its schema from the cluster. @@ -506,9 +522,9 @@ cdef class TabletServer: """ cdef: - KuduTabletServer* _tserver + const KuduTabletServer* _tserver - cdef _init(self, KuduTabletServer* tserver): + cdef _init(self, const KuduTabletServer* tserver): self._tserver = tserver return self @@ -642,6 +658,27 @@ cdef class Table: result.scanner = new KuduScanner(self.ptr()) return result + def scan_token_builder(self): + """ + Create a new ScanTokenBuilder for this table to build a series of + scan tokens. + + Examples + -------- + builder = table.scan_token_builder() + builder.set_fault_tolerant().add_predicate(table['key'] > 10) + tokens = builder.build() + for token in tokens: + scanner = token.into_kudu_scanner() + scanner.open() + tuples = scanner.read_all_tuples() + + Returns + ------- + builder : ScanTokenBuilder + """ + return ScanTokenBuilder(self) + cdef inline KuduTable* ptr(self): return self.table.get() @@ -1074,7 +1111,7 @@ cdef class Scanner: KuduScanner* scanner bint is_open - def __cinit__(self, Table table): + def __cinit__(self, Table table = None): self.table = table self.scanner = NULL self.is_open = 0 @@ -1278,6 +1315,359 @@ cdef class Scanner: check_status(self.scanner.NextBatch(&batch.batch)) return batch +cdef class ScanToken: + """ + A ScanToken describes a partial scan of a Kudu table limited to a single + contiguous physical location. Using the KuduScanTokenBuilder, clients + can describe the desired scan, including predicates, bounds, timestamps, + and caching, and receive back a collection of scan tokens. + """ + cdef: + KuduScanToken* _token + + def __cinit__(self): + self._token = NULL + + def __dealloc__(self): + if self._token != NULL: + del self._token + + cdef _init(self, KuduScanToken* token): + self._token = token + return self + + def into_kudu_scanner(self): + """ + Returns a scanner under the current client. + + Returns + ------- + scanner : Scanner + """ + cdef: + Scanner result = Scanner() + KuduScanner* _scanner = NULL + check_status(self._token.IntoKuduScanner(&_scanner)) + result.scanner = _scanner + return result + + + def tablet(self): + """ + Returns the Tablet associated with this ScanToken + + Returns + ------- + tablet : Tablet + """ + tablet = Tablet() + return tablet._init(&self._token.tablet()) + + def serialize(self): + """ + Serialize token into a string. + + Returns + ------- + serialized_token : string + """ + cdef string buf + check_status(self._token.Serialize(&buf)) + return frombytes(buf) + + def deserialize_into_scanner(self, Client client, serialized_token): + """ + Returns a new scanner from the serialized token created under + the provided Client. + + Parameters + ---------- + client : Client + serialized_token : string + + Returns + ------- + scanner : Scanner + """ + cdef: + Scanner result = Scanner() + KuduScanner* _scanner + check_status(self._token.DeserializeIntoScanner(client.cp, tobytes(serialized_token), &_scanner)) + result.scanner = _scanner + return result + + +cdef class ScanTokenBuilder: + """ + This class builds ScanTokens for a Table. + """ + cdef: + KuduScanTokenBuilder* _builder + Table _table + + def __cinit__(self, Table table): + self._table = table + self._builder = new KuduScanTokenBuilder(table.ptr()) + + def __dealloc__(self): + if self._builder != NULL: + del self._builder + + def set_projected_column_names(self, names): + """ + Sets the columns to be scanned. + Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + names : list of strings + + Returns + ------- + self : ScanTokenBuilder + """ + cdef vector[string] v_names + for name in names: + v_names.push_back(tobytes(name)) + check_status(self._builder.SetProjectedColumnNames(v_names)) + return self + + def set_projected_column_indexes(self, indexes): + """ + Sets the columns to be scanned. + Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + indexes : list of integers representing column indexes + + Returns + ------- + self : ScanTokenBuilder + """ + cdef vector[int] v_indexes = indexes + check_status(self._builder.SetProjectedColumnIndexes(v_indexes)) + return self + + def set_batch_size_bytes(self, batch_size): + """ + Sets the batch size in bytes. + Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + batch_size : Size of batch in bytes + + Returns + ------- + self : ScanTokenBuilder + """ + check_status(self._builder.SetBatchSizeBytes(batch_size)) + return self + + def set_timout_millis(self, millis): + """ + Sets the timeout in milliseconds. + Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + millis : int64_t + timeout in milliseconds + + Returns + ------- + self : ScanTokenBuilder + """ + check_status(self._builder.SetTimeoutMillis(millis)) + return self + + def set_fault_tolerant(self): + """ + Makes the underlying KuduScanner fault tolerant. + Returns a reference to itself to facilitate chaining. + + Returns + ------- + self : ScanTokenBuilder + """ + check_status(self._builder.SetFaultTolerant()) + return self + + def add_predicates(self, preds): + """ + Add a list of scan predicates to the ScanTokenBuilder. Select columns + from the parent table and make comparisons to create predicates. + + Examples + -------- + c = table[col_name] + preds = [c >= 0, c <= 10] + builder.add_predicates(preds) + + Parameters + ---------- + preds : list of Predicate + """ + for pred in preds: + self.add_predicate(pred) + + cpdef add_predicate(self, Predicate pred): + """ + Add a scan predicates to the scan token. Select columns from the + parent table and make comparisons to create predicates. + + Examples + -------- + pred = table[col_name] <= 10 + builder.add_predicate(pred) + + Parameters + ---------- + pred : kudu.Predicate + + Returns + ------- + self : ScanTokenBuilder + """ + cdef KuduPredicate* clone + + # We clone the KuduPredicate so that the Predicate wrapper class can be + # reused + clone = pred.pred.Clone() + check_status(self._builder.AddConjunctPredicate(clone)) + + def new_bound(self): + """ + Returns a new instance of a ScanBound (subclass of PartialRow) to be + later set with add_lower_bound()/add_upper_bound(). + + Returns + ------- + bound : ScanBound + """ + return ScanBound(self._table) + + def add_lower_bound(self, ScanBound bound): + """ + Sets the lower bound of the scan. + Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + bound : ScanBound + + Returns + ------- + self : ScanTokenBuilder + """ + check_status(self._builder.AddLowerBound(deref(bound.row))) + return self + + def add_upper_bound(self, ScanBound bound): + """ + Sets the upper bound of the scan. + Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + bound : ScanBound + + Returns + ------- + self : ScanTokenBuilder + """ + check_status(self._builder.AddUpperBound(deref(bound.row))) + return self + + def set_cache_blocks(self, cache_blocks): + """ + Sets the block caching policy. + Returns a reference to itself to facilitate chaining. + + Parameters + ---------- + cache_blocks : bool + + Returns + ------- + self : ScanTokenBuilder + """ + check_status(self._builder.SetCacheBlocks(cache_blocks)) + return self + + def build(self): + """ + Build the set of scan tokens. The builder may be reused after + this call. Returns a list of ScanTokens to be serialized and + executed in parallel with seperate client instances. + + Returns + ------- + tokens : List[ScanToken] + """ + + cdef: + vector[KuduScanToken*] tokens + size_t i + + check_status(self._builder.Build(&tokens)) + + result = [] + for i in range(tokens.size()): + token = ScanToken() + result.append(token._init(tokens[i])) + return result + + +cdef class Tablet: + """ + Represents a remote Tablet. Contains the tablet id and Replicas associated + with the Kudu Tablet. Retrieved by the ScanToken.tablet() method. + """ + cdef: + const KuduTablet* _tablet + vector[KuduReplica*] _replicas + + cdef _init(self, const KuduTablet* tablet): + self._tablet = tablet + return self + + def id(self): + return frombytes(self._tablet.id()) + + def replicas(self): + cdef size_t i + + result = [] + _replicas = self._tablet.replicas() + for i in range(_replicas.size()): + replica = Replica() + result.append(replica._init(_replicas[i])) + return result + +cdef class Replica: + """ + Represents a remote Tablet's replica. Retrieve a list of Replicas with the + Tablet.replicas() method. Contains the boolean is_leader and its + respective TabletServer object. + """ + cdef const KuduReplica* _replica + + cdef _init(self, const KuduReplica* replica): + self._replica = replica + return self + + def __dealloc__(self): + if self._replica != NULL: + del self._replica + + def is_leader(self): + return self._replica.is_leader() + + def ts(self): + ts = TabletServer() + return ts._init(&self._replica.ts()) cdef class KuduError: http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/libkudu_client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index e6592bb..546b2be 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -446,7 +446,10 @@ cdef extern from "kudu/client/value.h" namespace "kudu::client" nogil: cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: - # Omitted KuduClient::ReplicaSelection enum + enum ReplicaSelection" kudu::client::KuduClient::ReplicaSelection": + LEADER_ONLY " kudu::client::KuduClient::LEADER_ONLY" + CLOSEST_REPLICA " kudu::client::KuduClient::CLOSEST_REPLICA" + FIRST_REPLICA " kudu::client::KuduClient::FIRST_REPLICA" cdef cppclass KuduClient: @@ -607,8 +610,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: Status NextBatch(KuduScanBatch* batch) Status SetBatchSizeBytes(uint32_t batch_size) - # Pending definition of ReplicaSelection enum - # Status SetSelection(ReplicaSelection selection) + Status SetSelection(ReplicaSelection selection) Status SetReadMode(ReadMode read_mode) Status SetSnapshot(uint64_t snapshot_timestamp_micros) @@ -622,6 +624,47 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: KuduSchema GetProjectionSchema() string ToString() + cdef cppclass KuduScanToken: + KuduScanToken() + + const KuduTablet& tablet() + + Status IntoKuduScanner(KuduScanner** scanner) + Status Serialize(string* buf) + Status DeserializeIntoScanner(KuduClient* client, + const string& serialized_token, + KuduScanner** scanner) + + cdef cppclass KuduScanTokenBuilder: + KuduScanTokenBuilder(KuduTable* table) + + Status SetProjectedColumnNames(const vector[string]& col_names) + Status SetProjectedColumnIndexes(const vector[int]& col_indexes) + Status SetBatchSizeBytes(uint32_t batch_size) + Status SetReadMode(ReadMode read_mode) + Status SetFaultTolerant() + Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros) + Status SetSnapshotRaw(uint64_t snapshot_timestamp) + Status SetSelection(ReplicaSelection selection) + Status SetTimeoutMillis(int millis) + Status AddConjunctPredicate(KuduPredicate* pred) + Status AddLowerBound(const KuduPartialRow& key) + Status AddUpperBound(const KuduPartialRow& key) + Status SetCacheBlocks(c_bool cache_blocks) + Status Build(vector[KuduScanToken*]* tokens) + + cdef cppclass KuduTablet: + KuduTablet() + + const string& id() + const vector[const KuduReplica*]& replicas() + + cdef cppclass KuduReplica: + KuduReplica() + + c_bool is_leader() + const KuduTabletServer& ts() + cdef cppclass C_KuduError " kudu::client::KuduError": Status& status() http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/tests/test_scantoken.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py new file mode 100644 index 0000000..d855569 --- /dev/null +++ b/python/kudu/tests/test_scantoken.py @@ -0,0 +1,162 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from kudu.compat import unittest +from kudu.tests.common import KuduTestBase +import kudu +from multiprocessing import Pool + +def _get_scan_token_results(input): + client = kudu.Client("{0}:{1}".format(input[1], input[2])) + scanner = client.deserialize_token_into_scanner(input[0]) + scanner.open() + return scanner.read_all_tuples() + +class TestScanToken(KuduTestBase, unittest.TestCase): + + @classmethod + def setUpClass(self): + """ + Stolen from the the test scanner given the similarity in + functionality. + """ + super(TestScanToken, self).setUpClass() + + self.nrows = 100 + table = self.client.table(self.ex_table) + session = self.client.new_session() + + tuples = [] + for i in range(self.nrows): + op = table.new_insert() + tup = i, i * 2, 'hello_%d' % i if i % 2 == 0 else None + op['key'] = tup[0] + op['int_val'] = tup[1] + if i % 2 == 0: + op['string_val'] = tup[2] + elif i % 3 == 0: + op['string_val'] = None + session.apply(op) + tuples.append(tup) + session.flush() + + self.table = table + self.tuples = tuples + + def setUp(self): + pass + + def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples): + """ + Given the input serialized tokens, spawn new threads, + execute them and validate the results + """ + input = [(token.serialize(), self.master_host, self.master_port) + for token in tokens] + + # Begin process pool + pool = Pool(len(input)) + results = pool.map(_get_scan_token_results, input) + + #Validate results + actual_tuples = [] + for result in results: + actual_tuples += result + + self.assertEqual(sorted(expected_tuples), sorted(actual_tuples)) + + def test_scan_token_serde_threaded_with_named_projection(self): + """ + Creates scan tokens, serializes them, delivers them to new + threads then executes them in parallel with seperate clients. + """ + builder = self.table.scan_token_builder() + builder.set_projected_column_names(['key', 'string_val']).set_fault_tolerant() + + # Serialize execute and verify + self._subtest_serialize_thread_and_verify(builder.build(), + [(x[0], x[2]) for x in self.tuples]) + + def test_scan_token_serde_threaded_simple_predicate_and_index_projection(self): + """ + Creates scan tokens with predicates and an index projection, + serializes them, delivers them to new threads then executes + them in parallel with seperate clients. + """ + key = self.table['key'] + preds = [key >= 20, key <= 49] + + builder = self.table.scan_token_builder() + builder.set_projected_column_indexes([0, 1])\ + .set_fault_tolerant()\ + .add_predicates(preds) + + # Serialize execute and verify + self._subtest_serialize_thread_and_verify(builder.build(), + [x[0:2] for x in self.tuples[20:50]]) + + def test_scan_token_serde_threaded_with_bounds(self): + """ + Creates scan tokens with bounds, serializes them, + delivers them to new threads then executes them + in parallel with seperate clients. + """ + builder = self.table.scan_token_builder() + lower_bound = builder.new_bound() + lower_bound['key'] = 50 + upper_bound = builder.new_bound() + upper_bound['key'] = 55 + builder.set_fault_tolerant()\ + .add_lower_bound(lower_bound)\ + .add_upper_bound(upper_bound) + + # Serialize execute and verify + self._subtest_serialize_thread_and_verify(builder.build(), + self.tuples[50:55]) + + def test_scan_token_invalid_predicates(self): + builder = self.table.scan_token_builder() + sv = self.table['string_val'] + + with self.assertRaises(TypeError): + builder.add_predicates([sv >= None]) + + with self.assertRaises(kudu.KuduInvalidArgument): + builder.add_predicates([sv >= 1]) + + def test_scan_token_batch_by_batch_with_local_scanner(self): + builder = self.table.scan_token_builder() + lower_bound = builder.new_bound() + lower_bound['key'] = 10 + upper_bound = builder.new_bound() + upper_bound['key'] = 90 + builder.set_fault_tolerant() \ + .add_lower_bound(lower_bound) \ + .add_upper_bound(upper_bound) + tokens = builder.build() + + tuples = [] + for token in tokens: + scanner = token.into_kudu_scanner() + scanner.open() + + while scanner.has_more_rows(): + batch = scanner.next_batch() + tuples.extend(batch.as_tuples()) + + self.assertEqual(sorted(tuples), self.tuples[10:90]) http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 82f0383..830a103 100644 --- a/python/setup.py +++ b/python/setup.py @@ -154,7 +154,7 @@ setup( 'build_ext': build_ext }, setup_requires=['pytest-runner'], - tests_require=['pytest'], + tests_require=['pytest', 'multiprocessing'], install_requires=['cython >= 0.21'], description=DESCRIPTION, long_description=LONG_DESCRIPTION,
