[python] - Expose additional scanner methods
This patch exposes a few remaining scanner methods from the C++
client.
- set_cache_blocks
- keep_alive
- close
- get_current_server
Additionally, this patch fixes an issue with inappropriate deallocation
of replicas (this is handled by deallocating the ScanToken). This patch
includes tests.
Change-Id: Ifa6070a96a5daca796d463ffc3ffcbe5f0a5e08a
Reviewed-on: http://gerrit.cloudera.org:8080/4888
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/13ffec6e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/13ffec6e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/13ffec6e
Branch: refs/heads/master
Commit: 13ffec6eae6f498b7c21ccdd2fdb66f53df07afb
Parents: c25aea4
Author: Jordan Birdsell <[email protected]>
Authored: Sun Oct 30 20:55:53 2016 -0400
Committer: Jean-Daniel Cryans <[email protected]>
Committed: Mon Nov 7 19:59:21 2016 +0000
----------------------------------------------------------------------
python/kudu/client.pyx | 90 ++++++++++++++++++++++++++++++--
python/kudu/libkudu_client.pxd | 5 +-
python/kudu/tests/test_scanner.py | 7 +--
python/kudu/tests/test_scantoken.py | 30 +++++++----
4 files changed, 113 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 3cfd92a..501aaaa 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -447,6 +447,7 @@ cdef class Client:
result = []
for i in range(tservers.size()):
ts = TabletServer()
+ ts._own = 1
result.append(ts._init(tservers[i]))
return result
@@ -592,15 +593,27 @@ cdef class TabletServer:
cdef:
const KuduTabletServer* _tserver
+ public bint _own
cdef _init(self, const KuduTabletServer* tserver):
self._tserver = tserver
+ self._own = 0
return self
def __dealloc__(self):
- if self._tserver != NULL:
+ if self._tserver != NULL and self._own:
del self._tserver
+ def __richcmp__(TabletServer self, TabletServer other, int op):
+ if op == 2: # ==
+ return ((self.uuid(), self.hostname(), self.port()) ==
+ (other.uuid(), other.hostname(), other.port()))
+ elif op == 3: # !=
+ return ((self.uuid(), self.hostname(), self.port()) !=
+ (other.uuid(), other.hostname(), other.port()))
+ else:
+ raise NotImplementedError
+
def uuid(self):
return frombytes(self._tserver.uuid())
@@ -1649,6 +1662,77 @@ cdef class Scanner:
check_status(self.scanner.NextBatch(&batch.batch))
return batch
+ def set_cache_blocks(self, cache_blocks):
+ """
+ Sets the block caching policy.
+ Returns a reference to itself to facilitate chaining.
+
+ Parameters
+ ----------
+ cache_blocks : bool
+
+ Returns
+ -------
+ self : Scanner
+ """
+ check_status(self.scanner.SetCacheBlocks(cache_blocks))
+ return self
+
+ def keep_alive(self):
+ """
+ Keep the current remote scanner alive.
+
+ Keep the current remote scanner alive on the Tablet server for an
+ additional time-to-live (set by a configuration flag on the tablet
+ server). This is useful if the interval in between NextBatch() calls is
+ big enough that the remote scanner might be garbage collected (default
+ ttl is set to 60 secs.). This does not invalidate any previously
+ fetched results.
+
+ Returns
+ -------
+ self : Scanner
+ """
+ check_status(self.scanner.KeepAlive())
+ return self
+
+ def get_current_server(self):
+ """
+ Get the TabletServer that is currently handling the scan.
+
+ More concretely, this is the server that handled the most recent open()
+ or next_batch() RPC made by the server.
+
+ Returns
+ -------
+ tserver : TabletServer
+ """
+ cdef:
+ TabletServer tserver = TabletServer()
+ KuduTabletServer* tserver_p = NULL
+
+ check_status(self.scanner.GetCurrentServer(&tserver_p))
+ tserver._own = 1
+ tserver._init(tserver_p)
+ return tserver
+
+ def close(self):
+ """
+ Close the scanner.
+
+ Closing the scanner releases resources on the server. This call does
+ not block, and will not ever fail, even if the server cannot be
+ contacted.
+
+ Note: The scanner is reset to its initial state by this function.
+ You'll have to re-add any projection, predicates, etc if you want to
+ reuse this object.
+ Note: When the Scanner object is garbage collected, this method is run.
+ This method call is only needed if you want to explicitly release the
+ resources on the server.
+ """
+ self.scanner.Close()
+
cdef class ScanToken:
"""
@@ -2113,10 +2197,6 @@ cdef class Replica:
self._replica = replica
return self
- def __dealloc__(self):
- if self._replica != NULL:
- del self._replica
-
def is_leader(self):
return self._replica.is_leader()
http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index b56cc11..643e349 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -627,9 +627,8 @@ cdef extern from "kudu/client/client.h" namespace
"kudu::client" nogil:
c_bool HasMoreRows()
Status NextBatch(KuduScanBatch* batch)
Status SetBatchSizeBytes(uint32_t batch_size)
-
Status SetSelection(ReplicaSelection selection)
-
+ Status SetCacheBlocks(c_bool cache_blocks)
Status SetReadMode(ReadMode read_mode)
Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
Status SetTimeoutMillis(int millis)
@@ -638,6 +637,8 @@ cdef extern from "kudu/client/client.h" namespace
"kudu::client" nogil:
Status SetFaultTolerant()
Status AddLowerBound(const KuduPartialRow& key)
Status AddExclusiveUpperBound(const KuduPartialRow& key)
+ Status KeepAlive()
+ Status GetCurrentServer(KuduTabletServer** server)
KuduSchema GetProjectionSchema()
const ResourceMetrics& GetResourceMetrics()
http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py
b/python/kudu/tests/test_scanner.py
index 0b9aeb4..b483b38 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -211,14 +211,15 @@ class TestScanner(TestScanBase):
# Avoid tight looping
time.sleep(0.05)
- def test_resource_metrics(self):
+ def test_resource_metrics_and_cache_blocks(self):
"""
- Test getting the resource metrics after scanning.
+ Test getting the resource metrics after scanning and
+ setting the scanner to not cache blocks.
"""
# Build scanner and read through all batches and retrieve metrics.
scanner = self.table.scanner()
- scanner.set_fault_tolerant().open()
+ scanner.set_fault_tolerant().set_cache_blocks(False).open()
scanner.read_all_tuples()
metrics = scanner.get_resource_metrics()
http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py
b/python/kudu/tests/test_scantoken.py
index 115ac30..392e2a9 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -22,12 +22,16 @@ from kudu.tests.common import KuduTestBase
import kudu
from multiprocessing import Pool
import datetime
+import time
def _get_scan_token_results(input):
client = kudu.connect(input[1], input[2])
scanner = client.deserialize_token_into_scanner(input[0])
scanner.open()
- return scanner.read_all_tuples()
+ tuples = scanner.read_all_tuples()
+ # Test explicit closing of scanner
+ scanner.close()
+ return tuples
class TestScanToken(TestScanBase):
@@ -115,6 +119,16 @@ class TestScanToken(TestScanBase):
with self.assertRaises(TypeError):
builder.add_predicates([sv >= 1])
+ def _subtest_open_and_confirm_leader_tserver(self, token):
+ for replica in token.tablet().replicas():
+ if replica.is_leader():
+ leader_tserver = replica.ts()
+
+ scanner = token.into_kudu_scanner()
+ scanner.open()
+ self.assertEqual(scanner.get_current_server(), leader_tserver)
+ return scanner
+
def test_scan_token_batch_by_batch_with_local_scanner(self):
builder = self.table.scan_token_builder()
lower_bound = builder.new_bound()
@@ -128,8 +142,7 @@ class TestScanToken(TestScanBase):
tuples = []
for token in tokens:
- scanner = token.into_kudu_scanner()
- scanner.open()
+ scanner = self._subtest_open_and_confirm_leader_tserver(token)
while scanner.has_more_rows():
batch = scanner.next_batch()
@@ -150,10 +163,10 @@ class TestScanToken(TestScanBase):
tuples = []
for token in tokens:
- scanner = token.into_kudu_scanner()
- scanner.open()
+ scanner = self._subtest_open_and_confirm_leader_tserver(token)
while scanner.has_more_rows():
+ scanner.keep_alive()
batch = scanner.next_batch()
tuples.extend(batch.as_tuples())
@@ -192,7 +205,7 @@ class TestScanToken(TestScanBase):
tuples = []
for token in tokens:
- scanner = token.into_kudu_scanner().open()
+ scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())
self.assertEqual(sorted(self.tuples[1:]), sorted(tuples))
@@ -204,7 +217,7 @@ class TestScanToken(TestScanBase):
tuples = []
for token in tokens:
- scanner = token.into_kudu_scanner().open()
+ scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())
self.assertEqual(sorted(self.tuples), sorted(tuples))
@@ -260,8 +273,7 @@ class TestScanToken(TestScanBase):
tuples = []
for token in tokens:
- scanner = token.into_kudu_scanner()
- scanner.open()
+ scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())
self.assertEqual(sorted(tuples),