Repository: kudu Updated Branches: refs/heads/master f6777db35 -> 997ad765f
[python] : Add Pandas Support to Scanner This patch adds support for converting scanner results to a Pandas DataFrame. There are a few basic unit tests included in this patch as well. Change-Id: I90fab5c0c42448bcc17ea22be37420c6ef2f4915 Reviewed-on: http://gerrit.cloudera.org:8080/10809 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <granthe...@apache.org> Reviewed-by: Todd Lipcon <t...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/997ad765 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/997ad765 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/997ad765 Branch: refs/heads/master Commit: 997ad765fb4a0b19c87e29424c46f72c23c51bd0 Parents: f6777db Author: Jordan Birdsell <jtbirds...@apache.org> Authored: Sun Jun 24 18:49:28 2018 -0400 Committer: Jordan Birdsell <jtbirds...@apache.org> Committed: Mon Aug 6 17:56:02 2018 +0000 ---------------------------------------------------------------------- python/kudu/__init__.py | 3 +- python/kudu/client.pyx | 89 ++++++++++++++++++++++++++++++++++ python/kudu/tests/test_scanner.py | 61 +++++++++++++++++++++++ python/setup.py | 2 +- 4 files changed, 153 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/kudu/__init__.py ---------------------------------------------------------------------- diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py index 8939545..1146f05 100644 --- a/python/kudu/__init__.py +++ b/python/kudu/__init__.py @@ -30,7 +30,8 @@ from kudu.client import (Client, Table, Scanner, Session, # noqa READ_YOUR_WRITES, EXCLUSIVE_BOUND, INCLUSIVE_BOUND, - CLIENT_SUPPORTS_DECIMAL) + CLIENT_SUPPORTS_DECIMAL, + CLIENT_SUPPORTS_PANDAS) from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound, # noqa KuduNotSupported, http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index 39093b1..095488d 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -46,6 +46,12 @@ IF PYKUDU_INT128_SUPPORTED == 1: ELSE: CLIENT_SUPPORTS_DECIMAL = False +try: + import pandas + CLIENT_SUPPORTS_PANDAS = True +except ImportError: + CLIENT_SUPPORTS_PANDAS = False + # Replica selection enums LEADER_ONLY = ReplicaSelection_Leader CLOSEST_REPLICA = ReplicaSelection_Closest @@ -109,6 +115,26 @@ def _check_convert_range_bound_type(bound): except KeyError: invalid_bound_type(bound) +def _correct_pandas_data_type(dtype): + """ + This method returns the correct Pandas data type for some data types that + are converted incorrectly by Pandas. + + Returns + ------- + pdtype : type + """ + import numpy as np + if dtype == "int8": + return np.int8 + if dtype == "int16": + return np.int16 + if dtype == "int32": + return np.int32 + if dtype == "float": + return np.float32 + else: + return None cdef class TimeDelta: """ @@ -1970,6 +1996,18 @@ cdef class Scanner: return tuples + def xbatches(self): + """ + This method acts as a generator to enable more effective memory management + by yielding batches of tuples. + """ + + self.ensure_open() + + while self.has_more_rows(): + yield self.next_batch().as_tuples() + + def read_next_batch_tuples(self): return self.next_batch().as_tuples() @@ -2059,6 +2097,57 @@ cdef class Scanner: """ self.scanner.Close() + def to_pandas(self, index=None, coerce_float=False): + """ + Returns the contents of this Scanner to a Pandas DataFrame. + + This is only available if Pandas is installed. + + Note: This should only be used if the results from the scanner are expected + to be small, as Pandas will load the entire contents into memory. + + Parameters + ---------- + index : string, list of fields + Field or list of fields to use as the index + coerce_float : boolean + Attempt to convert decimal values to floating point (double precision). + + Returns + ------- + dataframe : DataFrame + """ + import pandas as pd + + self.ensure_open() + + # Here we are using list comprehension with the batch generator to avoid + # doubling our memory footprint. + dfs = [ pd.DataFrame.from_records(batch, + index=index, + coerce_float=coerce_float, + columns=self.get_projection_schema().names) + for batch in self.xbatches() + if len(batch) != 0 + ] + + df = pd.concat(dfs, ignore_index=not(bool(index))) + + types = {} + for column in self.get_projection_schema(): + pandas_type = _correct_pandas_data_type(column.type.name) + + if pandas_type is not None and \ + not(column.nullable and df[column.name].isnull().any()): + types[column.name] = pandas_type + + for col, dtype in types.items(): + df[col] = df[col].astype(dtype, copy=False) + + return df + + + cdef class ScanToken: """ http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/kudu/tests/test_scanner.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py index ef8cc9c..95afdd5 100644 --- a/python/kudu/tests/test_scanner.py +++ b/python/kudu/tests/test_scanner.py @@ -24,6 +24,7 @@ from kudu.tests.common import KuduTestBase, TimeoutError import kudu import datetime import time +import pytest class TestScanner(TestScanBase): @@ -326,3 +327,63 @@ class TestScanner(TestScanBase): scanner.open() self.assertEqual(sorted(scanner.read_all_tuples()), sorted(self.tuples)) + + def test_scanner_to_pandas_types(self): + """ + This test confirms that data types are converted as expected to Pandas. + """ + import numpy as np + scanner = self.type_table.scanner() + df = scanner.to_pandas() + types = df.dtypes + + if kudu.CLIENT_SUPPORTS_DECIMAL: + self.assertEqual(types[0], np.int64) + self.assertEqual(types[1], 'datetime64[ns, UTC]') + self.assertEqual(types[2], np.object) + self.assertEqual(types[3], np.object) + self.assertEqual(types[4], np.bool) + self.assertEqual(types[5], np.float64) + self.assertEqual(types[6], np.int8) + self.assertEqual(types[7], np.object) + self.assertEqual(types[8], np.float32) + else: + self.assertEqual(types[0], np.int64) + self.assertEqual(types[1], 'datetime64[ns, UTC]') + self.assertEqual(types[3], np.object) + self.assertEqual(types[4], np.bool) + self.assertEqual(types[5], np.float64) + self.assertEqual(types[6], np.int8) + self.assertEqual(types[7], np.object) + self.assertEqual(types[8], np.float32) + + def test_scanner_to_pandas_row_count(self): + """ + This test confirms that the record counts match between Pandas and the scanner. + """ + scanner = self.type_table.scanner() + scanner_count = len(scanner.read_all_tuples()) + scanner = self.type_table.scanner() + df = scanner.to_pandas() + self.assertEqual(scanner_count, df.shape[0]) + + def test_scanner_to_pandas_index(self): + """ + This test confirms that an index is correctly applied. + """ + scanner = self.type_table.scanner() + df = scanner.to_pandas(index='key') + self.assertEqual(df.index.name, 'key') + self.assertEqual(list(df.index), [1, 2]) + + @pytest.mark.skipif(not(kudu.CLIENT_SUPPORTS_DECIMAL), + reason="Decimal is not supported on this client.") + def test_scanner_to_pandas_index(self): + """ + This test confirms that a decimal column is coerced to a double when specified. + """ + import numpy as np + scanner = self.type_table.scanner() + df = scanner.to_pandas(coerce_float=True) + types = df.dtypes + self.assertEqual(types[2], np.float64) http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 7f34a50..44a1b54 100644 --- a/python/setup.py +++ b/python/setup.py @@ -194,7 +194,7 @@ setup( # # See https://docs.pytest.org/en/latest/changelog.html#id164 and # https://pypi.org/project/pytest-timeout/#id5 for more details. - tests_require=['pytest >=2.8,<3.3', 'pytest-timeout >=1.1.0,<1.2.1'], + tests_require=['pytest >=2.8,<3.3', 'pytest-timeout >=1.1.0,<1.2.1', 'pandas <0.21'], install_requires=['cython >= 0.21', 'pytz', 'six'], description=DESCRIPTION, long_description=LONG_DESCRIPTION,