This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e488f41b9bd Memoize some dataframes analysis operations. (#31377)
e488f41b9bd is described below

commit e488f41b9bde0c16f396eefe0a9ab6e1adcfe5eb
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue May 28 12:07:12 2024 -0700

    Memoize some dataframes analysis operations. (#31377)
    
    ReadFromCsv with an explicit dtype produced graphs that had quadratic
    traversal (though the computed results, sets, were always correct).
    
    This fixes https://github.com/apache/beam/issues/31152 and should help
    other deep expressions with common references as well.
---
 sdks/python/apache_beam/dataframe/expressions.py | 6 ++++--
 sdks/python/apache_beam/dataframe/io_test.py     | 9 +++++++++
 sdks/python/apache_beam/dataframe/transforms.py  | 1 +
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/expressions.py 
b/sdks/python/apache_beam/dataframe/expressions.py
index ae08cdaf54c..91d237c7de9 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -365,8 +365,10 @@ class ComputedExpression(Expression):
     self._preserves_partition_by = preserves_partition_by
 
   def placeholders(self):
-    return frozenset.union(
-        frozenset(), *[arg.placeholders() for arg in self.args()])
+    if not hasattr(self, '_placeholders'):
+      self._placeholders = frozenset.union(
+          frozenset(), *[arg.placeholders() for arg in self.args()])
+    return self._placeholders
 
   def args(self):
     return self._args
diff --git a/sdks/python/apache_beam/dataframe/io_test.py 
b/sdks/python/apache_beam/dataframe/io_test.py
index 782dac53e2c..92bb10225c7 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -117,6 +117,15 @@ A     B
     self.assertCountEqual(['a,b,c', '1,2,3', '3,4,7'],
                           set(self.read_all_lines(output + 'out.csv*')))
 
+  def test_wide_csv_with_dtypes(self):
+    # Verify https://github.com/apache/beam/issues/31152 is resolved.
+    cols = ','.join(f'col{ix}' for ix in range(123))
+    data = ','.join(str(ix) for ix in range(123))
+    input = self.temp_dir({'tmp.csv': f'{cols}\n{data}'})
+    with beam.Pipeline() as p:
+      pcoll = p | beam.io.ReadFromCsv(f'{input}tmp.csv', dtype=str)
+      assert_that(pcoll | beam.Map(max), equal_to(['99']))
+
   def test_sharding_parameters(self):
     data = pd.DataFrame({'label': ['11a', '37a', '389a'], 'rank': [0, 1, 2]})
     output = self.temp_dir()
diff --git a/sdks/python/apache_beam/dataframe/transforms.py 
b/sdks/python/apache_beam/dataframe/transforms.py
index 2e815408314..852b49c4e2e 100644
--- a/sdks/python/apache_beam/dataframe/transforms.py
+++ b/sdks/python/apache_beam/dataframe/transforms.py
@@ -302,6 +302,7 @@ class _DataframeExpressionsTransform(transforms.PTransform):
                 self.outputs))
 
     # First define some helper functions.
+    @_memoize
     def output_partitioning_in_stage(expr, stage):
       """Return the output partitioning of expr when computed in stage,
       or returns None if the expression cannot be computed in this stage.

Reply via email to