This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.30.0 by this push:
     new def7a23  [BEAM-12180] Set typehint in UnbatchPandas (#14568)
     new e6791c0  Merge pull request #14772 from TheNeuralBit/BEAM-12180-CP
def7a23 is described below

commit def7a2357c27f2f9e27b4b629c9d908a075a3fe4
Author: Brian Hulette <[email protected]>
AuthorDate: Mon May 10 11:44:23 2021 -0700

    [BEAM-12180] Set typehint in UnbatchPandas (#14568)
    
    * Set typehint in UnbatchPandas
    
    * Fix typehints
    
    * match_is_named_tuple (not private)
    
    * TODO(BEAM-8538)
    
    * lint
---
 sdks/python/apache_beam/dataframe/schemas.py       |  3 +-
 sdks/python/apache_beam/dataframe/schemas_test.py  | 83 +++++++++++++++-------
 .../typehints/native_type_compatibility.py         |  4 +-
 sdks/python/apache_beam/typehints/schemas.py       |  6 +-
 4 files changed, 66 insertions(+), 30 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/schemas.py 
b/sdks/python/apache_beam/dataframe/schemas.py
index cc30cec..ee3c2a3 100644
--- a/sdks/python/apache_beam/dataframe/schemas.py
+++ b/sdks/python/apache_beam/dataframe/schemas.py
@@ -281,7 +281,8 @@ def _unbatch_transform(proxy, include_indexes):
     ctor = element_type_from_dataframe(proxy, include_indexes=include_indexes)
 
     return beam.ParDo(
-        _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor))
+        _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor)
+    ).with_output_types(ctor)
   elif isinstance(proxy, pd.Series):
     # Raise a TypeError if proxy has an unknown type
     output_type = _dtype_to_fieldtype(proxy.dtype)
diff --git a/sdks/python/apache_beam/dataframe/schemas_test.py 
b/sdks/python/apache_beam/dataframe/schemas_test.py
index 8b1159c..af30e25 100644
--- a/sdks/python/apache_beam/dataframe/schemas_test.py
+++ b/sdks/python/apache_beam/dataframe/schemas_test.py
@@ -36,6 +36,8 @@ from apache_beam.dataframe import transforms
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import 
match_is_named_tuple
 
 Simple = typing.NamedTuple(
     'Simple', [('name', unicode), ('id', int), ('height', float)])
@@ -65,41 +67,59 @@ def matches_df(expected):
 # dtype. For example:
 #   pd.Series([b'abc'], dtype=bytes).dtype != 'S'
 #   pd.Series([b'abc'], dtype=bytes).astype(bytes).dtype == 'S'
+# (test data, pandas_type, column_name, beam_type)
 COLUMNS = [
-    ([375, 24, 0, 10, 16], np.int32, 'i32'),
-    ([375, 24, 0, 10, 16], np.int64, 'i64'),
-    ([375, 24, None, 10, 16], pd.Int32Dtype(), 'i32_nullable'),
-    ([375, 24, None, 10, 16], pd.Int64Dtype(), 'i64_nullable'),
-    ([375., 24., None, 10., 16.], np.float64, 'f64'),
-    ([375., 24., None, 10., 16.], np.float32, 'f32'),
-    ([True, False, True, True, False], bool, 'bool'),
-    (['Falcon', 'Ostrich', None, 3.14, 0], object, 'any'),
-    ([True, False, True, None, False], pd.BooleanDtype(), 'bool_nullable'),
+    ([375, 24, 0, 10, 16], np.int32, 'i32', np.int32),
+    ([375, 24, 0, 10, 16], np.int64, 'i64', np.int64),
+    ([375, 24, None, 10, 16],
+     pd.Int32Dtype(),
+     'i32_nullable',
+     typing.Optional[np.int32]),
+    ([375, 24, None, 10, 16],
+     pd.Int64Dtype(),
+     'i64_nullable',
+     typing.Optional[np.int64]),
+    ([375., 24., None, 10., 16.],
+     np.float64,
+     'f64',
+     typing.Optional[np.float64]),
+    ([375., 24., None, 10., 16.],
+     np.float32,
+     'f32',
+     typing.Optional[np.float32]),
+    ([True, False, True, True, False], bool, 'bool', bool),
+    (['Falcon', 'Ostrich', None, 3.14, 0], object, 'any', typing.Any),
+    ([True, False, True, None, False],
+     pd.BooleanDtype(),
+     'bool_nullable',
+     typing.Optional[bool]),
     (['Falcon', 'Ostrich', None, 'Aardvark', 'Elephant'],
      pd.StringDtype(),
-     'strdtype'),
-]  # type: typing.List[typing.Tuple[typing.List[typing.Any], typing.Any, str]]
+     'strdtype',
+     typing.Optional[str]),
+]  # type: typing.List[typing.Tuple[typing.List[typing.Any], typing.Any, str, 
typing.Any]]
 
-NICE_TYPES_DF = pd.DataFrame(columns=[name for _, _, name in COLUMNS])
-for arr, dtype, name in COLUMNS:
+NICE_TYPES_DF = pd.DataFrame(columns=[name for _, _, name, _ in COLUMNS])
+for arr, dtype, name, _ in COLUMNS:
   NICE_TYPES_DF[name] = pd.Series(arr, dtype=dtype, name=name).astype(dtype)
 
 NICE_TYPES_PROXY = NICE_TYPES_DF[:0]
 
-SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr) for arr,
-                dtype,
-                name in COLUMNS]
+SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr, beam_type)
+                for (arr, dtype, name, beam_type) in COLUMNS]
 
 _TEST_ARRAYS = [
-    arr for arr, _, _ in COLUMNS
+    arr for (arr, _, _, _) in COLUMNS
 ]  # type: typing.List[typing.List[typing.Any]]
 DF_RESULT = list(zip(*_TEST_ARRAYS))
-INDEX_DF_TESTS = [
-    (NICE_TYPES_DF.set_index([name for _, _, name in COLUMNS[:i]]), DF_RESULT)
-    for i in range(1, len(COLUMNS) + 1)
-]
+BEAM_SCHEMA = typing.NamedTuple(  # type: ignore
+    'BEAM_SCHEMA', [(name, beam_type) for _, _, name, beam_type in COLUMNS])
+INDEX_DF_TESTS = [(
+    NICE_TYPES_DF.set_index([name for _, _, name, _ in COLUMNS[:i]]),
+    DF_RESULT,
+    BEAM_SCHEMA) for i in range(1, len(COLUMNS) + 1)]
 
-NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT)]
+NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT, BEAM_SCHEMA)]
 
 PD_VERSION = tuple(int(n) for n in pd.__version__.split('.'))
 
@@ -203,8 +223,18 @@ class SchemasTest(unittest.TestCase):
                 proxy=schemas.generate_proxy(Animal)))
         assert_that(res, equal_to([('Falcon', 375.), ('Parrot', 25.)]))
 
+  def assert_typehints_equal(self, left, right):
+    left = typehints.normalize(left)
+    right = typehints.normalize(right)
+
+    if match_is_named_tuple(left):
+      self.assertTrue(match_is_named_tuple(right))
+      self.assertEqual(left.__annotations__, right.__annotations__)
+    else:
+      self.assertEqual(left, right)
+
   @parameterized.expand(SERIES_TESTS + NOINDEX_DF_TESTS)
-  def test_unbatch_no_index(self, df_or_series, rows):
+  def test_unbatch_no_index(self, df_or_series, rows, beam_type):
     proxy = df_or_series[:0]
 
     with TestPipeline() as p:
@@ -212,10 +242,15 @@ class SchemasTest(unittest.TestCase):
           p | beam.Create([df_or_series[::2], df_or_series[1::2]])
           | schemas.UnbatchPandas(proxy))
 
+      # Verify that the unbatched PCollection has the expected typehint
+      # TODO(BEAM-8538): typehints should support NamedTuple so we can use
+      # typehints.is_consistent_with here instead
+      self.assert_typehints_equal(res.element_type, beam_type)
+
       assert_that(res, equal_to(rows))
 
   @parameterized.expand(SERIES_TESTS + INDEX_DF_TESTS)
-  def test_unbatch_with_index(self, df_or_series, rows):
+  def test_unbatch_with_index(self, df_or_series, rows, _):
     proxy = df_or_series[:0]
 
     with TestPipeline() as p:
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py 
b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index 2b738bf..d77727e 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -107,7 +107,7 @@ def _match_is_exactly_iterable(user_type):
   return getattr(user_type, '__origin__', None) is expected_origin
 
 
-def _match_is_named_tuple(user_type):
+def match_is_named_tuple(user_type):
   return (
       _safe_issubclass(user_type, typing.Tuple) and
       hasattr(user_type, '_field_types'))
@@ -234,7 +234,7 @@ def convert_to_beam_type(typ):
       # We just convert it to Any for now.
       # This MUST appear before the entry for the normal Tuple.
       _TypeMapEntry(
-          match=_match_is_named_tuple, arity=0, beam_type=typehints.Any),
+          match=match_is_named_tuple, arity=0, beam_type=typehints.Any),
       _TypeMapEntry(
           match=_match_issubclass(typing.Tuple),
           arity=-1,
diff --git a/sdks/python/apache_beam/typehints/schemas.py 
b/sdks/python/apache_beam/typehints/schemas.py
index 6a299bd..5daf68a 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -70,10 +70,10 @@ from apache_beam.portability.api import schema_pb2
 from apache_beam.typehints import row_type
 from apache_beam.typehints.native_type_compatibility import _get_args
 from apache_beam.typehints.native_type_compatibility import 
_match_is_exactly_mapping
-from apache_beam.typehints.native_type_compatibility import 
_match_is_named_tuple
 from apache_beam.typehints.native_type_compatibility import _match_is_optional
 from apache_beam.typehints.native_type_compatibility import _safe_issubclass
 from apache_beam.typehints.native_type_compatibility import 
extract_optional_type
+from apache_beam.typehints.native_type_compatibility import 
match_is_named_tuple
 from apache_beam.utils import proto_utils
 from apache_beam.utils.timestamp import Timestamp
 
@@ -148,7 +148,7 @@ def named_fields_from_schema(
 
 
 def typing_to_runner_api(type_):
-  if _match_is_named_tuple(type_):
+  if match_is_named_tuple(type_):
     schema = None
     if hasattr(type_, _BEAM_SCHEMA_ID):
       schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, 
_BEAM_SCHEMA_ID))
@@ -287,7 +287,7 @@ def schema_from_element_type(element_type):  # (type) -> 
schema_pb2.Schema
     # TODO(BEAM-10722): Make sure beam.Row generated schemas are registered and
     # de-duped
     return named_fields_to_schema(element_type._fields)
-  elif _match_is_named_tuple(element_type):
+  elif match_is_named_tuple(element_type):
     return named_tuple_to_schema(element_type)
   else:
     raise TypeError(

Reply via email to