This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 081cb9a  [BEAM-12701] Added extra parameter in to_csv for 
DeferredFrame to name the PTransform label (#15450)
081cb9a is described below

commit 081cb9a51384bba1e66bb60bf1d61e84e817b4e1
Author: Eduardo Sánchez López <[email protected]>
AuthorDate: Thu Sep 16 19:27:23 2021 +0200

    [BEAM-12701] Added extra parameter in to_csv for DeferredFrame to name the 
PTransform label (#15450)
    
    * [BEAM-12701] Added extra parameter in to_csv for DeferredFrame to name 
the PTransform label
    
    * Modified changelog type from known issues to bugfixing
    
    * Sensible transform label and linter fix
    
    * Test fix - Typing tuple row
---
 CHANGES.md                                   |  2 ++
 sdks/python/apache_beam/dataframe/io.py      | 13 ++++++++-----
 sdks/python/apache_beam/dataframe/io_test.py | 29 ++++++++++++++++++++++++++++
 3 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e6181da..2b3c370 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -89,6 +89,8 @@
 ## Bugfixes
 
 * Fixed X (Java/Python) 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Fixed error while writing multiple DeferredFrames to csv (Python)
+([BEAM-12701](https://issues.apache.org/jira/browse/BEAM-12701)).
 
 ## Known Issues
 
diff --git a/sdks/python/apache_beam/dataframe/io.py 
b/sdks/python/apache_beam/dataframe/io.py
index 5046acc..5ef8e2c 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -74,16 +74,19 @@ def read_csv(path, *args, splittable=False, **kwargs):
       splitter=_CsvSplitter(args, kwargs) if splittable else None)
 
 
-def _as_pc(df):
+def _as_pc(df, label=None):
   from apache_beam.dataframe import convert  # avoid circular import
   # TODO(roberwb): Amortize the computation for multiple writes?
-  return convert.to_pcollection(df, yield_elements='pandas')
+  return convert.to_pcollection(df, yield_elements='pandas', label=label)
 
 
 @frame_base.with_docs_from(pd.DataFrame)
-def to_csv(df, path, *args, **kwargs):
-
-  return _as_pc(df) | _WriteToPandas(
+def to_csv(df, path, transform_label=None, *args, **kwargs):
+  label_pc = f"{transform_label} - ToPCollection" if transform_label \
+    else f"ToPCollection(df) - {path}"
+  label_pd = f"{transform_label} - ToPandasDataFrame" if transform_label \
+    else f"WriteToPandas(df) - {path}"
+  return _as_pc(df, label_pc) | label_pd >> _WriteToPandas(
       'to_csv', path, args, kwargs, incremental=True, binary=False)
 
 
diff --git a/sdks/python/apache_beam/dataframe/io_test.py 
b/sdks/python/apache_beam/dataframe/io_test.py
index 374eb0c..060eebf 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -40,6 +40,10 @@ from apache_beam.io import restriction_trackers
 from apache_beam.testing.util import assert_that
 
 
+class SimpleRow(typing.NamedTuple):
+  value: int
+
+
 class MyRow(typing.NamedTuple):
   timestamp: int
   value: int
@@ -343,6 +347,31 @@ X     , c1, c2
     # Check that we've read (and removed) every output file
     self.assertEqual(len(glob.glob(f'{output}out.csv*')), 0)
 
+  def test_double_write(self):
+    output = self.temp_dir()
+    with beam.Pipeline() as p:
+      pc1 = p | 'create pc1' >> beam.Create(
+          [SimpleRow(value=i) for i in [1, 2]])
+      pc2 = p | 'create pc2' >> beam.Create(
+          [SimpleRow(value=i) for i in [3, 4]])
+
+      deferred_df1 = convert.to_dataframe(pc1)
+      deferred_df2 = convert.to_dataframe(pc2)
+
+      deferred_df1.to_csv(
+          f'{output}out1.csv',
+          transform_label="Writing to csv PC1",
+          index=False)
+      deferred_df2.to_csv(
+          f'{output}out2.csv',
+          transform_label="Writing to csv PC2",
+          index=False)
+
+    self.assertCountEqual(['value', '1', '2'],
+                          set(self.read_all_lines(output + 'out1.csv*')))
+    self.assertCountEqual(['value', '3', '4'],
+                          set(self.read_all_lines(output + 'out2.csv*')))
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to