This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e488f41b9bd Memoize some dataframes analysis operations. (#31377)
e488f41b9bd is described below
commit e488f41b9bde0c16f396eefe0a9ab6e1adcfe5eb
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue May 28 12:07:12 2024 -0700
Memoize some dataframes analysis operations. (#31377)
ReadFromCsv with an explicit dtype produced graphs that had quadratic
traversal (though the computed results, sets, were always correct).
This fixes https://github.com/apache/beam/issues/31152 and should help
other deep expressions with common references as well.
---
sdks/python/apache_beam/dataframe/expressions.py | 6 ++++--
sdks/python/apache_beam/dataframe/io_test.py | 9 +++++++++
sdks/python/apache_beam/dataframe/transforms.py | 1 +
3 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/expressions.py
b/sdks/python/apache_beam/dataframe/expressions.py
index ae08cdaf54c..91d237c7de9 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -365,8 +365,10 @@ class ComputedExpression(Expression):
self._preserves_partition_by = preserves_partition_by
def placeholders(self):
- return frozenset.union(
- frozenset(), *[arg.placeholders() for arg in self.args()])
+ if not hasattr(self, '_placeholders'):
+ self._placeholders = frozenset.union(
+ frozenset(), *[arg.placeholders() for arg in self.args()])
+ return self._placeholders
def args(self):
return self._args
diff --git a/sdks/python/apache_beam/dataframe/io_test.py
b/sdks/python/apache_beam/dataframe/io_test.py
index 782dac53e2c..92bb10225c7 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -117,6 +117,15 @@ A B
self.assertCountEqual(['a,b,c', '1,2,3', '3,4,7'],
set(self.read_all_lines(output + 'out.csv*')))
+ def test_wide_csv_with_dtypes(self):
+ # Verify https://github.com/apache/beam/issues/31152 is resolved.
+ cols = ','.join(f'col{ix}' for ix in range(123))
+ data = ','.join(str(ix) for ix in range(123))
+ input = self.temp_dir({'tmp.csv': f'{cols}\n{data}'})
+ with beam.Pipeline() as p:
+ pcoll = p | beam.io.ReadFromCsv(f'{input}tmp.csv', dtype=str)
+ assert_that(pcoll | beam.Map(max), equal_to(['99']))
+
def test_sharding_parameters(self):
data = pd.DataFrame({'label': ['11a', '37a', '389a'], 'rank': [0, 1, 2]})
output = self.temp_dir()
diff --git a/sdks/python/apache_beam/dataframe/transforms.py
b/sdks/python/apache_beam/dataframe/transforms.py
index 2e815408314..852b49c4e2e 100644
--- a/sdks/python/apache_beam/dataframe/transforms.py
+++ b/sdks/python/apache_beam/dataframe/transforms.py
@@ -302,6 +302,7 @@ class _DataframeExpressionsTransform(transforms.PTransform):
self.outputs))
# First define some helper functions.
+ @_memoize
def output_partitioning_in_stage(expr, stage):
"""Return the output partitioning of expr when computed in stage,
or returns None if the expression cannot be computed in this stage.