[
https://issues.apache.org/jira/browse/BEAM-9546?focusedWorklogId=470894&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-470894
]
ASF GitHub Bot logged work on BEAM-9546:
----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Aug/20 21:32
Start Date: 14/Aug/20 21:32
Worklog Time Spent: 10m
Work Description: robertwb commented on a change in pull request #11980:
URL: https://github.com/apache/beam/pull/11980#discussion_r470852017
##########
File path: sdks/python/apache_beam/dataframe/schemas.py
##########
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for relating schema-aware PCollections and dataframe transforms.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import NamedTuple
+from typing import TypeVar
+
+import pandas as pd
+
+from apache_beam import typehints
+from apache_beam.transforms.core import DoFn
+from apache_beam.transforms.core import ParDo
+from apache_beam.transforms.util import BatchElements
+from apache_beam.typehints.schemas import named_fields_from_element_type
+
+__all__ = ('BatchRowsAsDataFrame', 'generate_proxy')
+
+T = TypeVar('T', bound=NamedTuple)
+
+
[email protected]_input_types(T)
[email protected]_output_types(pd.DataFrame)
+class BatchRowsAsDataFrame(BatchElements):
+ """A transform that batches schema-aware PCollection elements into DataFrames
+
+ Batching parameters are inherited from
+ :class:`~apache_beam.transforms.util.BatchElements`.
+ """
+ def __init__(self, *args, **kwargs):
+ super(BatchRowsAsDataFrame, self).__init__(*args, **kwargs)
+ self._batch_elements_transform = BatchElements(*args, **kwargs)
+
+ def expand(self, pcoll):
+ return super(BatchRowsAsDataFrame, self).expand(pcoll) | ParDo(
Review comment:
Rather than subclassing, it would probably be cleaner to make this just
a PTransform whose expand method returns
`pcoll | BatchElements(...) | Pardo(...)`.
If you want to accept all the parameter from BatchElements, you could
construct the BatchElements instance in your constructor.
##########
File path: sdks/python/apache_beam/dataframe/schemas.py
##########
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for relating schema-aware PCollections and dataframe transforms.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import NamedTuple
+from typing import TypeVar
+
+import pandas as pd
+
+from apache_beam import typehints
+from apache_beam.transforms.core import DoFn
+from apache_beam.transforms.core import ParDo
+from apache_beam.transforms.util import BatchElements
+from apache_beam.typehints.schemas import named_fields_from_element_type
+
+__all__ = ('BatchRowsAsDataFrame', 'generate_proxy')
+
+T = TypeVar('T', bound=NamedTuple)
+
+
[email protected]_input_types(T)
[email protected]_output_types(pd.DataFrame)
+class BatchRowsAsDataFrame(BatchElements):
+ """A transform that batches schema-aware PCollection elements into DataFrames
+
+ Batching parameters are inherited from
+ :class:`~apache_beam.transforms.util.BatchElements`.
+ """
+ def __init__(self, *args, **kwargs):
+ super(BatchRowsAsDataFrame, self).__init__(*args, **kwargs)
+ self._batch_elements_transform = BatchElements(*args, **kwargs)
+
+ def expand(self, pcoll):
+ return super(BatchRowsAsDataFrame, self).expand(pcoll) | ParDo(
+ _RowBatchToDataFrameDoFn(pcoll.element_type))
+
+
+class _RowBatchToDataFrameDoFn(DoFn):
Review comment:
Rather than letting this be a full DoFn, you could just let columns be a
local variable in the map above, and then write
`... | Map(lambda batch: pd.DataFrame.from_records(batch, columns))`
##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -251,3 +258,23 @@ def named_tuple_from_schema(schema):
def named_tuple_to_schema(named_tuple):
return typing_to_runner_api(named_tuple).row_type.schema
+
+
+def schema_from_element_type(element_type): # (type) -> schema_pb2.Schema
+ """Get a schema for the given PCollection element_type.
+
+ Returns schema as a list of (name, python_type) tuples"""
+ if isinstance(element_type, row_type.RowTypeConstraint):
+ # TODO: Make sure beam.Row generated schemas are registered and de-duped
Review comment:
Is this worth a JIRA?
##########
File path: sdks/python/apache_beam/dataframe/convert.py
##########
@@ -36,7 +37,7 @@
# TODO: Or should this be called as_dataframe?
def to_dataframe(
pcoll, # type: pvalue.PCollection
- proxy, # type: pandas.core.generic.NDFrame
+ proxy=None, # type: pandas.core.generic.NDFrame
Review comment:
Woo hoo!
##########
File path: sdks/python/apache_beam/dataframe/transforms_test.py
##########
@@ -112,6 +133,64 @@ def test_scalar(self):
self.run_scenario(
df, lambda df: df.groupby('key').sum().val / df.val.agg(sum))
+ def test_batching_named_tuple_input(self):
+ with beam.Pipeline() as p:
+ result = (
+ p | beam.Create([
+ AnimalSpeed('Aardvark', 5),
+ AnimalSpeed('Ant', 2),
+ AnimalSpeed('Elephant', 35),
+ AnimalSpeed('Zebra', 40)
+ ]).with_output_types(AnimalSpeed)
+ | transforms.DataframeTransform(lambda df: df.filter(regex='A.*')))
Review comment:
This threw me because I was expecting the result to be `['Aardvark',
'Ant']`. I see now that it's filtering down to the column names that start with
A, but perhaps the filter could be written a bit differently to make it more
obvious (e.g. filter on the values, let the regex be 'Anim*', or use another
operation).
##########
File path: sdks/python/apache_beam/pvalue.py
##########
@@ -88,7 +88,7 @@ class PValue(object):
def __init__(self,
pipeline, # type: Pipeline
tag=None, # type: Optional[str]
- element_type=None, # type: Optional[object]
+ element_type=None, # type: Optional[type]
Review comment:
Or a type constraint. (Not all our type hints are types.)
##########
File path: sdks/python/apache_beam/dataframe/schemas.py
##########
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for relating schema-aware PCollections and dataframe transforms.
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import NamedTuple
+from typing import TypeVar
+
+import pandas as pd
+
+from apache_beam import typehints
+from apache_beam.transforms.core import DoFn
Review comment:
Nit. We typically have the style of importing modules, and then using
qualified names (which results in less churn and makes it a bit easier to
figure out where things come from). Instead of core, it's typical to do `import
apache_beam as beam` and use beam.DoFn, etc.
##########
File path: sdks/python/apache_beam/dataframe/schemas_test.py
##########
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for schemas."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import unittest
+from typing import NamedTuple
+
+import future.tests.base # pylint: disable=unused-import
+# patches unittest.testcase to be python3 compatible
+import pandas as pd
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.coders import RowCoder
+from apache_beam.coders.typecoders import registry as coders_registry
+from apache_beam.dataframe import schemas
+from apache_beam.dataframe import transforms
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+
+Simple = NamedTuple(
+ 'Simple', [('name', unicode), ('id', int), ('height', float)])
+coders_registry.register_coder(Simple, RowCoder)
+Animal = NamedTuple('Animal', [('animal', unicode), ('max_speed', float)])
+coders_registry.register_coder(Animal, RowCoder)
+
+
+def matches_df(expected):
+ def check_df_pcoll_equal(actual):
+ actual = pd.concat(actual)
+ sorted_actual = actual.sort_values(by=list(actual.columns)).reset_index(
+ drop=True)
+ sorted_expected = expected.sort_values(
+ by=list(expected.columns)).reset_index(drop=True)
+ if not sorted_actual.equals(sorted_expected):
+ raise AssertionError(
+ 'Dataframes not equal: \n\nActual:\n%s\n\nExpected:\n%s' %
+ (sorted_actual, sorted_expected))
+
+ return check_df_pcoll_equal
+
+
+class SchemasTest(unittest.TestCase):
+ def test_simple_df(self):
+ expected = pd.DataFrame({
+ 'name': list(unicode(i) for i in range(5)),
+ 'id': list(range(5)),
+ 'height': list(float(i) for i in range(5))
+ },
+ columns=['name', 'id', 'height'])
+
+ with TestPipeline() as p:
+ res = (
+ p
+ | beam.Create([
+ Simple(name=unicode(i), id=i, height=float(i)) for i in range(5)
+ ])
+ | schemas.BatchRowsAsDataFrame(min_batch_size=10, max_batch_size=10))
+ assert_that(res, matches_df(expected))
+
+ def test_generate_proxy(self):
+ expected = pd.DataFrame({
+ 'animal': pd.Series(dtype=unicode), 'max_speed': pd.Series(dtype=float)
+ })
+
+ self.assertTrue(schemas.generate_proxy(Animal).equals(expected))
+
+ def test_batch_with_df_transform(self):
Review comment:
Maybe do a test using to_dataframe?
##########
File path: sdks/python/apache_beam/dataframe/schemas_test.py
##########
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for schemas."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import unittest
+from typing import NamedTuple
+
+import future.tests.base # pylint: disable=unused-import
+# patches unittest.testcase to be python3 compatible
+import pandas as pd
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.coders import RowCoder
+from apache_beam.coders.typecoders import registry as coders_registry
+from apache_beam.dataframe import schemas
+from apache_beam.dataframe import transforms
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+
+Simple = NamedTuple(
+ 'Simple', [('name', unicode), ('id', int), ('height', float)])
+coders_registry.register_coder(Simple, RowCoder)
+Animal = NamedTuple('Animal', [('animal', unicode), ('max_speed', float)])
+coders_registry.register_coder(Animal, RowCoder)
+
+
+def matches_df(expected):
+ def check_df_pcoll_equal(actual):
+ actual = pd.concat(actual)
+ sorted_actual = actual.sort_values(by=list(actual.columns)).reset_index(
+ drop=True)
+ sorted_expected = expected.sort_values(
+ by=list(expected.columns)).reset_index(drop=True)
+ if not sorted_actual.equals(sorted_expected):
+ raise AssertionError(
+ 'Dataframes not equal: \n\nActual:\n%s\n\nExpected:\n%s' %
+ (sorted_actual, sorted_expected))
+
+ return check_df_pcoll_equal
+
+
+class SchemasTest(unittest.TestCase):
+ def test_simple_df(self):
+ expected = pd.DataFrame({
+ 'name': list(unicode(i) for i in range(5)),
+ 'id': list(range(5)),
+ 'height': list(float(i) for i in range(5))
+ },
+ columns=['name', 'id', 'height'])
+
+ with TestPipeline() as p:
+ res = (
+ p
+ | beam.Create([
+ Simple(name=unicode(i), id=i, height=float(i)) for i in range(5)
+ ])
+ | schemas.BatchRowsAsDataFrame(min_batch_size=10, max_batch_size=10))
+ assert_that(res, matches_df(expected))
+
+ def test_generate_proxy(self):
+ expected = pd.DataFrame({
+ 'animal': pd.Series(dtype=unicode), 'max_speed': pd.Series(dtype=float)
+ })
+
+ self.assertTrue(schemas.generate_proxy(Animal).equals(expected))
+
+ def test_batch_with_df_transform(self):
+ with TestPipeline() as p:
+ res = (
+ p
+ | beam.Create([
+ Animal('Falcon', 380.0),
+ Animal('Falcon', 370.0),
+ Animal('Parrot', 24.0),
+ Animal('Parrot', 26.0)
+ ])
+ | schemas.BatchRowsAsDataFrame()
+ | transforms.DataframeTransform(
+ lambda df: df.groupby('animal').mean(),
+ proxy=schemas.generate_proxy(Animal)))
Review comment:
Can we get rid of this one too? (Or at least drop a TODO to do it in a
subsequent PR?)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 470894)
Time Spent: 0.5h (was: 20m)
> Support for batching a schema-aware PCollection and processing as a Dataframe
> -----------------------------------------------------------------------------
>
> Key: BEAM-9546
> URL: https://issues.apache.org/jira/browse/BEAM-9546
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)