TheNeuralBit commented on a change in pull request #12882:
URL: https://github.com/apache/beam/pull/12882#discussion_r492328919



##########
File path: sdks/python/apache_beam/dataframe/schemas.py
##########
@@ -55,17 +159,149 @@ def expand(self, pcoll):
         lambda batch: pd.DataFrame.from_records(batch, columns=columns))
 
 
-def _make_empty_series(name, typ):
-  try:
-    return pd.Series(name=name, dtype=typ)
-  except TypeError:
-    raise TypeError("Unable to convert type '%s' for field '%s'" % (name, typ))
+def _make_proxy_series(name, typehint):
+  # Default to np.object. This is lossy, we won't be able to recover the type
+  # at the output.
+  dtype = BEAM_TO_PANDAS.get(typehint, np.object)
+
+  return pd.Series(name=name, dtype=dtype)
 
 
 def generate_proxy(element_type):
   # type: (type) -> pd.DataFrame
-  return pd.DataFrame({
-      name: _make_empty_series(name, typ)
-      for name,
-      typ in named_fields_from_element_type(element_type)
-  })
+
+  """ Generate a proxy pandas object for the given PCollection element_type.
+
+  Currently only supports generating a DataFrame proxy from a schema-aware
+  PCollection."""
+  fields = named_fields_from_element_type(element_type)
+  return pd.DataFrame(
+      {name: _make_proxy_series(name, typehint)
+       for name, typehint in fields},
+      columns=[name for name, _ in fields])
+
+
+def element_type_from_proxy(proxy):

Review comment:
       Done

##########
File path: sdks/python/apache_beam/dataframe/schemas.py
##########
@@ -55,17 +159,149 @@ def expand(self, pcoll):
         lambda batch: pd.DataFrame.from_records(batch, columns=columns))
 
 
-def _make_empty_series(name, typ):
-  try:
-    return pd.Series(name=name, dtype=typ)
-  except TypeError:
-    raise TypeError("Unable to convert type '%s' for field '%s'" % (name, typ))
+def _make_proxy_series(name, typehint):
+  # Default to np.object. This is lossy, we won't be able to recover the type
+  # at the output.
+  dtype = BEAM_TO_PANDAS.get(typehint, np.object)
+
+  return pd.Series(name=name, dtype=dtype)
 
 
 def generate_proxy(element_type):
   # type: (type) -> pd.DataFrame
-  return pd.DataFrame({
-      name: _make_empty_series(name, typ)
-      for name,
-      typ in named_fields_from_element_type(element_type)
-  })
+
+  """ Generate a proxy pandas object for the given PCollection element_type.

Review comment:
       Done

##########
File path: sdks/python/apache_beam/dataframe/schemas.py
##########
@@ -55,17 +159,149 @@ def expand(self, pcoll):
         lambda batch: pd.DataFrame.from_records(batch, columns=columns))
 
 
-def _make_empty_series(name, typ):
-  try:
-    return pd.Series(name=name, dtype=typ)
-  except TypeError:
-    raise TypeError("Unable to convert type '%s' for field '%s'" % (name, typ))
+def _make_proxy_series(name, typehint):
+  # Default to np.object. This is lossy, we won't be able to recover the type
+  # at the output.
+  dtype = BEAM_TO_PANDAS.get(typehint, np.object)
+

Review comment:
       Removed

##########
File path: sdks/python/apache_beam/dataframe/schemas.py
##########
@@ -15,25 +15,129 @@
 # limitations under the License.
 #
 
-"""Utilities for relating schema-aware PCollections and dataframe transforms.
+r"""Utilities for relating schema-aware PCollections and dataframe transforms.
+
+pandas dtype               Python typing
+np.int{8,16,32,64}      <-----> np.int{8,16,32,64}*
+pd.Int{8,16,32,64}Dtype <-----> Optional[np.int{8,16,32,64}]*
+np.float{32,64}         <-----> Optional[np.float{32,64}]
+                           \--- np.float{32,64}
+np.dtype('S')           <-----> bytes
+Not supported           <------ Optional[bytes]
+np.bool                 <-----> np.bool
+
+* int, float, bool are treated the same as np.int64, np.float64, np.bool
+
+Any unknown or unsupported types are trested as Any and shunted to
+np.object:
+
+np.object               <-----> Any
+
+Strings and nullable Booleans are handled differently when using pandas 0.x vs.
+1.x. pandas 0.x has no mapping for these types, so they are shunted lossily to
+  np.object.
+
+pandas 0.x:
+np.object         <------ Optional[bool]
+                     \--- Optional[str]
+                      \-- str
+
+pandas 1.x:
+pd.BooleanDType() <-----> Optional[bool]
+pd.StringDType()  <-----> Optional[str]
+                     \--- str
+
+Pandas does not support hierarchical data natively. All structured types
+(Sequence, Mapping, nested NamedTuple types), will be shunted lossily to
+np.object/Any.
+
+TODO: Mapping for date/time types
+https://pandas.pydata.org/docs/user_guide/timeseries.html#overview
+
+timestamps and timedeltas in pandas always use nanosecond precision
 """
 
 # pytype: skip-file
 
 from __future__ import absolute_import
 
-import typing
+from typing import Any
+from typing import NamedTuple
+from typing import Optional
+from typing import TypeVar
+from typing import Union
 
+import numpy as np
 import pandas as pd
 
 import apache_beam as beam
 from apache_beam import typehints
+from apache_beam.portability.api import schema_pb2
 from apache_beam.transforms.util import BatchElements
+from apache_beam.typehints.native_type_compatibility import _match_is_optional
 from apache_beam.typehints.schemas import named_fields_from_element_type
+from apache_beam.typehints.schemas import named_fields_to_schema
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.utils import proto_utils
+
+__all__ = (
+    'BatchRowsAsDataFrame',
+    'generate_proxy',
+    'UnbatchPandas',
+    'element_type_from_proxy')
+
+T = TypeVar('T', bound=NamedTuple)
+
+PD_MAJOR, _, _ = map(int, pd.__version__.split('.'))

Review comment:
       Fixed, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to