Repository: kudu
Updated Branches:
  refs/heads/master f6777db35 -> 997ad765f


[python] : Add Pandas Support to Scanner

This patch adds support for converting scanner results to a
Pandas DataFrame. There are a few basic unit tests included
in this patch as well.

Change-Id: I90fab5c0c42448bcc17ea22be37420c6ef2f4915
Reviewed-on: http://gerrit.cloudera.org:8080/10809
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <granthe...@apache.org>
Reviewed-by: Todd Lipcon <t...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/997ad765
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/997ad765
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/997ad765

Branch: refs/heads/master
Commit: 997ad765fb4a0b19c87e29424c46f72c23c51bd0
Parents: f6777db
Author: Jordan Birdsell <jtbirds...@apache.org>
Authored: Sun Jun 24 18:49:28 2018 -0400
Committer: Jordan Birdsell <jtbirds...@apache.org>
Committed: Mon Aug 6 17:56:02 2018 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py           |  3 +-
 python/kudu/client.pyx            | 89 ++++++++++++++++++++++++++++++++++
 python/kudu/tests/test_scanner.py | 61 +++++++++++++++++++++++
 python/setup.py                   |  2 +-
 4 files changed, 153 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 8939545..1146f05 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -30,7 +30,8 @@ from kudu.client import (Client, Table, Scanner, Session,  # 
noqa
                          READ_YOUR_WRITES,
                          EXCLUSIVE_BOUND,
                          INCLUSIVE_BOUND,
-                         CLIENT_SUPPORTS_DECIMAL)
+                         CLIENT_SUPPORTS_DECIMAL,
+                         CLIENT_SUPPORTS_PANDAS)
 
 from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound,  # noqa
                          KuduNotSupported,

http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 39093b1..095488d 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -46,6 +46,12 @@ IF PYKUDU_INT128_SUPPORTED == 1:
 ELSE:
     CLIENT_SUPPORTS_DECIMAL = False
 
+try:
+    import pandas
+    CLIENT_SUPPORTS_PANDAS = True
+except ImportError:
+    CLIENT_SUPPORTS_PANDAS = False
+
 # Replica selection enums
 LEADER_ONLY = ReplicaSelection_Leader
 CLOSEST_REPLICA = ReplicaSelection_Closest
@@ -109,6 +115,26 @@ def _check_convert_range_bound_type(bound):
         except KeyError:
             invalid_bound_type(bound)
 
+def _correct_pandas_data_type(dtype):
+    """
+    This method returns the correct Pandas data type for some data types that
+    are converted incorrectly by Pandas.
+
+    Returns
+    -------
+    pdtype : type
+    """
+    import numpy as np
+    if dtype == "int8":
+        return np.int8
+    if dtype == "int16":
+        return np.int16
+    if dtype == "int32":
+        return np.int32
+    if dtype == "float":
+        return np.float32
+    else:
+        return None
 
 cdef class TimeDelta:
     """
@@ -1970,6 +1996,18 @@ cdef class Scanner:
 
         return tuples
 
+    def xbatches(self):
+        """
+        This method acts as a generator to enable more effective memory 
management
+        by yielding batches of tuples.
+        """
+
+        self.ensure_open()
+
+        while self.has_more_rows():
+            yield self.next_batch().as_tuples()
+
+
     def read_next_batch_tuples(self):
         return self.next_batch().as_tuples()
 
@@ -2059,6 +2097,57 @@ cdef class Scanner:
         """
         self.scanner.Close()
 
+    def to_pandas(self, index=None, coerce_float=False):
+        """
+        Returns the contents of this Scanner to a Pandas DataFrame.
+
+        This is only available if Pandas is installed.
+
+        Note: This should only be used if the results from the scanner are 
expected
+        to be small, as Pandas will load the entire contents into memory.
+
+        Parameters
+        ----------
+        index : string, list of fields
+            Field or list of fields to use as the index
+        coerce_float : boolean
+            Attempt to convert decimal values to floating point (double 
precision).
+
+        Returns
+        -------
+        dataframe : DataFrame
+        """
+        import pandas as pd
+
+        self.ensure_open()
+
+        # Here we are using list comprehension with the batch generator to 
avoid
+        # doubling our memory footprint.
+        dfs = [ pd.DataFrame.from_records(batch,
+                                          index=index,
+                                          coerce_float=coerce_float,
+                                          
columns=self.get_projection_schema().names)
+                for batch in self.xbatches()
+                if len(batch) != 0
+                ]
+
+        df = pd.concat(dfs, ignore_index=not(bool(index)))
+
+        types = {}
+        for column in self.get_projection_schema():
+            pandas_type = _correct_pandas_data_type(column.type.name)
+
+            if pandas_type is not None and \
+                not(column.nullable and df[column.name].isnull().any()):
+                types[column.name] = pandas_type
+
+        for col, dtype in types.items():
+            df[col] = df[col].astype(dtype, copy=False)
+
+        return df
+
+
+
 
 cdef class ScanToken:
     """

http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py 
b/python/kudu/tests/test_scanner.py
index ef8cc9c..95afdd5 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -24,6 +24,7 @@ from kudu.tests.common import KuduTestBase, TimeoutError
 import kudu
 import datetime
 import time
+import pytest
 
 
 class TestScanner(TestScanBase):
@@ -326,3 +327,63 @@ class TestScanner(TestScanBase):
             scanner.open()
             self.assertEqual(sorted(scanner.read_all_tuples()),
                              sorted(self.tuples))
+
+    def test_scanner_to_pandas_types(self):
+        """
+        This test confirms that data types are converted as expected to Pandas.
+        """
+        import numpy as np
+        scanner = self.type_table.scanner()
+        df = scanner.to_pandas()
+        types = df.dtypes
+
+        if kudu.CLIENT_SUPPORTS_DECIMAL:
+            self.assertEqual(types[0], np.int64)
+            self.assertEqual(types[1], 'datetime64[ns, UTC]')
+            self.assertEqual(types[2], np.object)
+            self.assertEqual(types[3], np.object)
+            self.assertEqual(types[4], np.bool)
+            self.assertEqual(types[5], np.float64)
+            self.assertEqual(types[6], np.int8)
+            self.assertEqual(types[7], np.object)
+            self.assertEqual(types[8], np.float32)
+        else:
+            self.assertEqual(types[0], np.int64)
+            self.assertEqual(types[1], 'datetime64[ns, UTC]')
+            self.assertEqual(types[3], np.object)
+            self.assertEqual(types[4], np.bool)
+            self.assertEqual(types[5], np.float64)
+            self.assertEqual(types[6], np.int8)
+            self.assertEqual(types[7], np.object)
+            self.assertEqual(types[8], np.float32)
+
+    def test_scanner_to_pandas_row_count(self):
+        """
+        This test confirms that the record counts match between Pandas and the 
scanner.
+        """
+        scanner = self.type_table.scanner()
+        scanner_count = len(scanner.read_all_tuples())
+        scanner = self.type_table.scanner()
+        df = scanner.to_pandas()
+        self.assertEqual(scanner_count, df.shape[0])
+
+    def test_scanner_to_pandas_index(self):
+        """
+        This test confirms that an index is correctly applied.
+        """
+        scanner = self.type_table.scanner()
+        df = scanner.to_pandas(index='key')
+        self.assertEqual(df.index.name, 'key')
+        self.assertEqual(list(df.index), [1, 2])
+
+    @pytest.mark.skipif(not(kudu.CLIENT_SUPPORTS_DECIMAL),
+                        reason="Decimal is not supported on this client.")
+    def test_scanner_to_pandas_index(self):
+        """
+        This test confirms that a decimal column is coerced to a double when 
specified.
+        """
+        import numpy as np
+        scanner = self.type_table.scanner()
+        df = scanner.to_pandas(coerce_float=True)
+        types = df.dtypes
+        self.assertEqual(types[2], np.float64)

http://git-wip-us.apache.org/repos/asf/kudu/blob/997ad765/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 7f34a50..44a1b54 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -194,7 +194,7 @@ setup(
     #
     # See https://docs.pytest.org/en/latest/changelog.html#id164 and
     # https://pypi.org/project/pytest-timeout/#id5 for more details.
-    tests_require=['pytest >=2.8,<3.3', 'pytest-timeout >=1.1.0,<1.2.1'],
+    tests_require=['pytest >=2.8,<3.3', 'pytest-timeout >=1.1.0,<1.2.1', 
'pandas <0.21'],
     install_requires=['cython >= 0.21', 'pytz', 'six'],
     description=DESCRIPTION,
     long_description=LONG_DESCRIPTION,

Reply via email to