This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b781b82842a [Bug] fix fillna function on a single column fail (#32594)
b781b82842a is described below
commit b781b82842a11e79396fa6177a8944d0f50c68d5
Author: DKPHUONG <[email protected]>
AuthorDate: Wed Oct 9 21:01:42 2024 +0700
[Bug] fix fillna function on a single column fail (#32594)
* fix bug all arg add as inputs
* fix bug for fillna
* Revert "fix bug for fillna"
This reverts commit 2a5736c8b4af8ffcac6336a79f759f73da67dad1.
* fix bug for fillna
* add test for fillna a column
* add test for fillna a column
* add test for fillna a column
* revert add test to frames_test
* Move test from transforms to frames
---
sdks/python/apache_beam/dataframe/frames_test.py | 11 +++++++++++
sdks/python/apache_beam/dataframe/transforms.py | 6 +++++-
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py
b/sdks/python/apache_beam/dataframe/frames_test.py
index 076ab504add..55d9fc5f4df 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -1025,6 +1025,17 @@ class DeferredFrameTest(_AbstractFrameTest):
self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)
+ def test_dataframe_column_fillna_constant_as_value(self):
+ from apache_beam.dataframe import convert
+ from apache_beam.testing.util import assert_that
+ from apache_beam.testing.util import equal_to
+ with beam.Pipeline(None) as p:
+ pcoll = (
+ p | beam.Create([1.0, np.nan, -1.0]) | beam.Select(x=lambda x: x))
+ df = convert.to_dataframe(pcoll)
+ df_new = df['x'].fillna(0)
+ assert_that(convert.to_pcollection(df_new), equal_to([1.0, 0.0, -1.0]))
+
@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_verify_integrity(self):
df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
diff --git a/sdks/python/apache_beam/dataframe/transforms.py
b/sdks/python/apache_beam/dataframe/transforms.py
index 852b49c4e2e..d0b5be4eb2a 100644
--- a/sdks/python/apache_beam/dataframe/transforms.py
+++ b/sdks/python/apache_beam/dataframe/transforms.py
@@ -395,7 +395,11 @@ class
_DataframeExpressionsTransform(transforms.PTransform):
if stage is None:
# No stage available, compute this expression as part of a new stage.
- stage = Stage(expr.args(), expr.requires_partition_by())
+ stage = Stage([
+ arg for arg in expr.args()
+ if not isinstance(arg, expressions.ConstantExpression)
+ ],
+ expr.requires_partition_by())
for arg in expr.args():
# For each argument, declare that it is also available in
# this new stage.