This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b781b82842a [Bug] fix fillna function on a single column fail (#32594)
b781b82842a is described below

commit b781b82842a11e79396fa6177a8944d0f50c68d5
Author: DKPHUONG <[email protected]>
AuthorDate: Wed Oct 9 21:01:42 2024 +0700

    [Bug] fix fillna function on a single column fail (#32594)
    
    * fix bug all arg add as inputs
    
    * fix bug for fillna
    
    * Revert "fix bug for fillna"
    
    This reverts commit 2a5736c8b4af8ffcac6336a79f759f73da67dad1.
    
    * fix bug for fillna
    
    * add test for fillna a column
    
    * add test for fillna a column
    
    * add test for fillna a column
    
    * revert add test to frames_test
    
    * Move test from transforms to frames
---
 sdks/python/apache_beam/dataframe/frames_test.py | 11 +++++++++++
 sdks/python/apache_beam/dataframe/transforms.py  |  6 +++++-
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/dataframe/frames_test.py 
b/sdks/python/apache_beam/dataframe/frames_test.py
index 076ab504add..55d9fc5f4df 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -1025,6 +1025,17 @@ class DeferredFrameTest(_AbstractFrameTest):
 
     self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)
 
+  def test_dataframe_column_fillna_constant_as_value(self):
+    from apache_beam.dataframe import convert
+    from apache_beam.testing.util import assert_that
+    from apache_beam.testing.util import equal_to
+    with beam.Pipeline(None) as p:
+      pcoll = (
+          p | beam.Create([1.0, np.nan, -1.0]) | beam.Select(x=lambda x: x))
+      df = convert.to_dataframe(pcoll)
+      df_new = df['x'].fillna(0)
+      assert_that(convert.to_pcollection(df_new), equal_to([1.0, 0.0, -1.0]))
+
   @unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
   def test_append_verify_integrity(self):
     df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
diff --git a/sdks/python/apache_beam/dataframe/transforms.py 
b/sdks/python/apache_beam/dataframe/transforms.py
index 852b49c4e2e..d0b5be4eb2a 100644
--- a/sdks/python/apache_beam/dataframe/transforms.py
+++ b/sdks/python/apache_beam/dataframe/transforms.py
@@ -395,7 +395,11 @@ class 
_DataframeExpressionsTransform(transforms.PTransform):
 
       if stage is None:
         # No stage available, compute this expression as part of a new stage.
-        stage = Stage(expr.args(), expr.requires_partition_by())
+        stage = Stage([
+            arg for arg in expr.args()
+            if not isinstance(arg, expressions.ConstantExpression)
+        ],
+                      expr.requires_partition_by())
         for arg in expr.args():
           # For each argument, declare that it is also available in
           # this new stage.

Reply via email to