This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 081cb9a [BEAM-12701] Added extra parameter in to_csv for
DeferredFrame to name the PTransform label (#15450)
081cb9a is described below
commit 081cb9a51384bba1e66bb60bf1d61e84e817b4e1
Author: Eduardo Sánchez López <[email protected]>
AuthorDate: Thu Sep 16 19:27:23 2021 +0200
[BEAM-12701] Added extra parameter in to_csv for DeferredFrame to name the
PTransform label (#15450)
* [BEAM-12701] Added extra parameter in to_csv for DeferredFrame to name
the PTransform label
* Modified changelog type from known issues to bugfixing
* Sensible transform label and linter fix
* Test fix - Typing tuple row
---
CHANGES.md | 2 ++
sdks/python/apache_beam/dataframe/io.py | 13 ++++++++-----
sdks/python/apache_beam/dataframe/io_test.py | 29 ++++++++++++++++++++++++++++
3 files changed, 39 insertions(+), 5 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index e6181da..2b3c370 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -89,6 +89,8 @@
## Bugfixes
* Fixed X (Java/Python)
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Fixed error while writing multiple DeferredFrames to csv (Python)
+([BEAM-12701](https://issues.apache.org/jira/browse/BEAM-12701)).
## Known Issues
diff --git a/sdks/python/apache_beam/dataframe/io.py
b/sdks/python/apache_beam/dataframe/io.py
index 5046acc..5ef8e2c 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -74,16 +74,19 @@ def read_csv(path, *args, splittable=False, **kwargs):
splitter=_CsvSplitter(args, kwargs) if splittable else None)
-def _as_pc(df):
+def _as_pc(df, label=None):
from apache_beam.dataframe import convert # avoid circular import
# TODO(roberwb): Amortize the computation for multiple writes?
- return convert.to_pcollection(df, yield_elements='pandas')
+ return convert.to_pcollection(df, yield_elements='pandas', label=label)
@frame_base.with_docs_from(pd.DataFrame)
-def to_csv(df, path, *args, **kwargs):
-
- return _as_pc(df) | _WriteToPandas(
+def to_csv(df, path, transform_label=None, *args, **kwargs):
+ label_pc = f"{transform_label} - ToPCollection" if transform_label \
+ else f"ToPCollection(df) - {path}"
+ label_pd = f"{transform_label} - ToPandasDataFrame" if transform_label \
+ else f"WriteToPandas(df) - {path}"
+ return _as_pc(df, label_pc) | label_pd >> _WriteToPandas(
'to_csv', path, args, kwargs, incremental=True, binary=False)
diff --git a/sdks/python/apache_beam/dataframe/io_test.py
b/sdks/python/apache_beam/dataframe/io_test.py
index 374eb0c..060eebf 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -40,6 +40,10 @@ from apache_beam.io import restriction_trackers
from apache_beam.testing.util import assert_that
+class SimpleRow(typing.NamedTuple):
+ value: int
+
+
class MyRow(typing.NamedTuple):
timestamp: int
value: int
@@ -343,6 +347,31 @@ X , c1, c2
# Check that we've read (and removed) every output file
self.assertEqual(len(glob.glob(f'{output}out.csv*')), 0)
+ def test_double_write(self):
+ output = self.temp_dir()
+ with beam.Pipeline() as p:
+ pc1 = p | 'create pc1' >> beam.Create(
+ [SimpleRow(value=i) for i in [1, 2]])
+ pc2 = p | 'create pc2' >> beam.Create(
+ [SimpleRow(value=i) for i in [3, 4]])
+
+ deferred_df1 = convert.to_dataframe(pc1)
+ deferred_df2 = convert.to_dataframe(pc2)
+
+ deferred_df1.to_csv(
+ f'{output}out1.csv',
+ transform_label="Writing to csv PC1",
+ index=False)
+ deferred_df2.to_csv(
+ f'{output}out2.csv',
+ transform_label="Writing to csv PC2",
+ index=False)
+
+ self.assertCountEqual(['value', '1', '2'],
+ set(self.read_all_lines(output + 'out1.csv*')))
+ self.assertCountEqual(['value', '3', '4'],
+ set(self.read_all_lines(output + 'out2.csv*')))
+
if __name__ == '__main__':
unittest.main()