This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.30.0 by this push:
new def7a23 [BEAM-12180] Set typehint in UnbatchPandas (#14568)
new e6791c0 Merge pull request #14772 from TheNeuralBit/BEAM-12180-CP
def7a23 is described below
commit def7a2357c27f2f9e27b4b629c9d908a075a3fe4
Author: Brian Hulette <[email protected]>
AuthorDate: Mon May 10 11:44:23 2021 -0700
[BEAM-12180] Set typehint in UnbatchPandas (#14568)
* Set typehint in UnbatchPandas
* Fix typehints
* match_is_named_tuple (not private)
* TODO(BEAM-8538)
* lint
---
sdks/python/apache_beam/dataframe/schemas.py | 3 +-
sdks/python/apache_beam/dataframe/schemas_test.py | 83 +++++++++++++++-------
.../typehints/native_type_compatibility.py | 4 +-
sdks/python/apache_beam/typehints/schemas.py | 6 +-
4 files changed, 66 insertions(+), 30 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/schemas.py
b/sdks/python/apache_beam/dataframe/schemas.py
index cc30cec..ee3c2a3 100644
--- a/sdks/python/apache_beam/dataframe/schemas.py
+++ b/sdks/python/apache_beam/dataframe/schemas.py
@@ -281,7 +281,8 @@ def _unbatch_transform(proxy, include_indexes):
ctor = element_type_from_dataframe(proxy, include_indexes=include_indexes)
return beam.ParDo(
- _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor))
+ _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor)
+ ).with_output_types(ctor)
elif isinstance(proxy, pd.Series):
# Raise a TypeError if proxy has an unknown type
output_type = _dtype_to_fieldtype(proxy.dtype)
diff --git a/sdks/python/apache_beam/dataframe/schemas_test.py
b/sdks/python/apache_beam/dataframe/schemas_test.py
index 8b1159c..af30e25 100644
--- a/sdks/python/apache_beam/dataframe/schemas_test.py
+++ b/sdks/python/apache_beam/dataframe/schemas_test.py
@@ -36,6 +36,8 @@ from apache_beam.dataframe import transforms
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import
match_is_named_tuple
Simple = typing.NamedTuple(
'Simple', [('name', unicode), ('id', int), ('height', float)])
@@ -65,41 +67,59 @@ def matches_df(expected):
# dtype. For example:
# pd.Series([b'abc'], dtype=bytes).dtype != 'S'
# pd.Series([b'abc'], dtype=bytes).astype(bytes).dtype == 'S'
+# (test data, pandas_type, column_name, beam_type)
COLUMNS = [
- ([375, 24, 0, 10, 16], np.int32, 'i32'),
- ([375, 24, 0, 10, 16], np.int64, 'i64'),
- ([375, 24, None, 10, 16], pd.Int32Dtype(), 'i32_nullable'),
- ([375, 24, None, 10, 16], pd.Int64Dtype(), 'i64_nullable'),
- ([375., 24., None, 10., 16.], np.float64, 'f64'),
- ([375., 24., None, 10., 16.], np.float32, 'f32'),
- ([True, False, True, True, False], bool, 'bool'),
- (['Falcon', 'Ostrich', None, 3.14, 0], object, 'any'),
- ([True, False, True, None, False], pd.BooleanDtype(), 'bool_nullable'),
+ ([375, 24, 0, 10, 16], np.int32, 'i32', np.int32),
+ ([375, 24, 0, 10, 16], np.int64, 'i64', np.int64),
+ ([375, 24, None, 10, 16],
+ pd.Int32Dtype(),
+ 'i32_nullable',
+ typing.Optional[np.int32]),
+ ([375, 24, None, 10, 16],
+ pd.Int64Dtype(),
+ 'i64_nullable',
+ typing.Optional[np.int64]),
+ ([375., 24., None, 10., 16.],
+ np.float64,
+ 'f64',
+ typing.Optional[np.float64]),
+ ([375., 24., None, 10., 16.],
+ np.float32,
+ 'f32',
+ typing.Optional[np.float32]),
+ ([True, False, True, True, False], bool, 'bool', bool),
+ (['Falcon', 'Ostrich', None, 3.14, 0], object, 'any', typing.Any),
+ ([True, False, True, None, False],
+ pd.BooleanDtype(),
+ 'bool_nullable',
+ typing.Optional[bool]),
(['Falcon', 'Ostrich', None, 'Aardvark', 'Elephant'],
pd.StringDtype(),
- 'strdtype'),
-] # type: typing.List[typing.Tuple[typing.List[typing.Any], typing.Any, str]]
+ 'strdtype',
+ typing.Optional[str]),
+] # type: typing.List[typing.Tuple[typing.List[typing.Any], typing.Any, str,
typing.Any]]
-NICE_TYPES_DF = pd.DataFrame(columns=[name for _, _, name in COLUMNS])
-for arr, dtype, name in COLUMNS:
+NICE_TYPES_DF = pd.DataFrame(columns=[name for _, _, name, _ in COLUMNS])
+for arr, dtype, name, _ in COLUMNS:
NICE_TYPES_DF[name] = pd.Series(arr, dtype=dtype, name=name).astype(dtype)
NICE_TYPES_PROXY = NICE_TYPES_DF[:0]
-SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr) for arr,
- dtype,
- name in COLUMNS]
+SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr, beam_type)
+ for (arr, dtype, name, beam_type) in COLUMNS]
_TEST_ARRAYS = [
- arr for arr, _, _ in COLUMNS
+ arr for (arr, _, _, _) in COLUMNS
] # type: typing.List[typing.List[typing.Any]]
DF_RESULT = list(zip(*_TEST_ARRAYS))
-INDEX_DF_TESTS = [
- (NICE_TYPES_DF.set_index([name for _, _, name in COLUMNS[:i]]), DF_RESULT)
- for i in range(1, len(COLUMNS) + 1)
-]
+BEAM_SCHEMA = typing.NamedTuple( # type: ignore
+ 'BEAM_SCHEMA', [(name, beam_type) for _, _, name, beam_type in COLUMNS])
+INDEX_DF_TESTS = [(
+ NICE_TYPES_DF.set_index([name for _, _, name, _ in COLUMNS[:i]]),
+ DF_RESULT,
+ BEAM_SCHEMA) for i in range(1, len(COLUMNS) + 1)]
-NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT)]
+NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT, BEAM_SCHEMA)]
PD_VERSION = tuple(int(n) for n in pd.__version__.split('.'))
@@ -203,8 +223,18 @@ class SchemasTest(unittest.TestCase):
proxy=schemas.generate_proxy(Animal)))
assert_that(res, equal_to([('Falcon', 375.), ('Parrot', 25.)]))
+ def assert_typehints_equal(self, left, right):
+ left = typehints.normalize(left)
+ right = typehints.normalize(right)
+
+ if match_is_named_tuple(left):
+ self.assertTrue(match_is_named_tuple(right))
+ self.assertEqual(left.__annotations__, right.__annotations__)
+ else:
+ self.assertEqual(left, right)
+
@parameterized.expand(SERIES_TESTS + NOINDEX_DF_TESTS)
- def test_unbatch_no_index(self, df_or_series, rows):
+ def test_unbatch_no_index(self, df_or_series, rows, beam_type):
proxy = df_or_series[:0]
with TestPipeline() as p:
@@ -212,10 +242,15 @@ class SchemasTest(unittest.TestCase):
p | beam.Create([df_or_series[::2], df_or_series[1::2]])
| schemas.UnbatchPandas(proxy))
+ # Verify that the unbatched PCollection has the expected typehint
+ # TODO(BEAM-8538): typehints should support NamedTuple so we can use
+ # typehints.is_consistent_with here instead
+ self.assert_typehints_equal(res.element_type, beam_type)
+
assert_that(res, equal_to(rows))
@parameterized.expand(SERIES_TESTS + INDEX_DF_TESTS)
- def test_unbatch_with_index(self, df_or_series, rows):
+ def test_unbatch_with_index(self, df_or_series, rows, _):
proxy = df_or_series[:0]
with TestPipeline() as p:
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py
b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index 2b738bf..d77727e 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -107,7 +107,7 @@ def _match_is_exactly_iterable(user_type):
return getattr(user_type, '__origin__', None) is expected_origin
-def _match_is_named_tuple(user_type):
+def match_is_named_tuple(user_type):
return (
_safe_issubclass(user_type, typing.Tuple) and
hasattr(user_type, '_field_types'))
@@ -234,7 +234,7 @@ def convert_to_beam_type(typ):
# We just convert it to Any for now.
# This MUST appear before the entry for the normal Tuple.
_TypeMapEntry(
- match=_match_is_named_tuple, arity=0, beam_type=typehints.Any),
+ match=match_is_named_tuple, arity=0, beam_type=typehints.Any),
_TypeMapEntry(
match=_match_issubclass(typing.Tuple),
arity=-1,
diff --git a/sdks/python/apache_beam/typehints/schemas.py
b/sdks/python/apache_beam/typehints/schemas.py
index 6a299bd..5daf68a 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -70,10 +70,10 @@ from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import row_type
from apache_beam.typehints.native_type_compatibility import _get_args
from apache_beam.typehints.native_type_compatibility import
_match_is_exactly_mapping
-from apache_beam.typehints.native_type_compatibility import
_match_is_named_tuple
from apache_beam.typehints.native_type_compatibility import _match_is_optional
from apache_beam.typehints.native_type_compatibility import _safe_issubclass
from apache_beam.typehints.native_type_compatibility import
extract_optional_type
+from apache_beam.typehints.native_type_compatibility import
match_is_named_tuple
from apache_beam.utils import proto_utils
from apache_beam.utils.timestamp import Timestamp
@@ -148,7 +148,7 @@ def named_fields_from_schema(
def typing_to_runner_api(type_):
- if _match_is_named_tuple(type_):
+ if match_is_named_tuple(type_):
schema = None
if hasattr(type_, _BEAM_SCHEMA_ID):
schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_,
_BEAM_SCHEMA_ID))
@@ -287,7 +287,7 @@ def schema_from_element_type(element_type): # (type) ->
schema_pb2.Schema
# TODO(BEAM-10722): Make sure beam.Row generated schemas are registered and
# de-duped
return named_fields_to_schema(element_type._fields)
- elif _match_is_named_tuple(element_type):
+ elif match_is_named_tuple(element_type):
return named_tuple_to_schema(element_type)
else:
raise TypeError(