damccorm commented on code in PR #28486:
URL: https://github.com/apache/beam/pull/28486#discussion_r1330700743


##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -28,12 +28,38 @@
 import yaml
 
 import apache_beam as beam
+import apache_beam.io as beam_io
 from apache_beam.io import ReadFromBigQuery
 from apache_beam.io import WriteToBigQuery
 from apache_beam.io.gcp.bigquery import BigQueryDisposition
+from apache_beam.typehints.schemas import named_fields_from_element_type
 from apache_beam.yaml import yaml_provider
 
 
+def read_from_text(path: str):
+  # TODO(yaml): Consider passing the filename and offset, possibly even
+  # by default.

Review Comment:
   To be clear, you're saying pass them as fields in the returned beam.Row? I'm 
+1 on optionally doing that in the future FWIW (it would maybe be generally 
useful for ReadFromText) - in particular, getting the filename would likely be 
helpful in some use cases



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -28,12 +28,38 @@
 import yaml
 
 import apache_beam as beam
+import apache_beam.io as beam_io
 from apache_beam.io import ReadFromBigQuery
 from apache_beam.io import WriteToBigQuery
 from apache_beam.io.gcp.bigquery import BigQueryDisposition
+from apache_beam.typehints.schemas import named_fields_from_element_type
 from apache_beam.yaml import yaml_provider
 
 
+def read_from_text(path: str):
+  # TODO(yaml): Consider passing the filename and offset, possibly even
+  # by default.
+  return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s))
+
+
[email protected]_fn
+def write_to_text(pcoll, path: str):
+  try:
+    field_names = [
+        name for name, _ in named_fields_from_element_type(pcoll.element_type)
+    ]
+  except Exception as exn:
+    raise ValueError(
+        "WriteToText requires an input schema with exactly one field.") from 
exn
+  if len(field_names) != 1:
+    raise ValueError(
+        "WriteToText requires an input schema with exactly one field, got %s" %
+        field_names)
+  sole_field_name, = field_names
+  return pcoll | beam.Map(
+      lambda x: str(getattr(x, sole_field_name))) | beam.io.WriteToText(path)

Review Comment:
   Should we take a (required?) `file_extension` parameter as well? Right now, 
this will output files like: `<path>-<shard_id>`, but I'd guess most people 
will want `<path>-<shard_id>.<extension>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to