damccorm commented on code in PR #28486: URL: https://github.com/apache/beam/pull/28486#discussion_r1330700743
########## sdks/python/apache_beam/yaml/yaml_io.py: ########## @@ -28,12 +28,38 @@ import yaml import apache_beam as beam +import apache_beam.io as beam_io from apache_beam.io import ReadFromBigQuery from apache_beam.io import WriteToBigQuery from apache_beam.io.gcp.bigquery import BigQueryDisposition +from apache_beam.typehints.schemas import named_fields_from_element_type from apache_beam.yaml import yaml_provider +def read_from_text(path: str): + # TODO(yaml): Consider passing the filename and offset, possibly even + # by default. Review Comment: To be clear, you're saying pass them as fields in the returned beam.Row? I'm +1 on optionally doing that in the future FWIW (it would maybe be generally useful for ReadFromText) - in particular, getting the filename would likely be helpful in some use cases ########## sdks/python/apache_beam/yaml/yaml_io.py: ########## @@ -28,12 +28,38 @@ import yaml import apache_beam as beam +import apache_beam.io as beam_io from apache_beam.io import ReadFromBigQuery from apache_beam.io import WriteToBigQuery from apache_beam.io.gcp.bigquery import BigQueryDisposition +from apache_beam.typehints.schemas import named_fields_from_element_type from apache_beam.yaml import yaml_provider +def read_from_text(path: str): + # TODO(yaml): Consider passing the filename and offset, possibly even + # by default. + return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s)) + + [email protected]_fn +def write_to_text(pcoll, path: str): + try: + field_names = [ + name for name, _ in named_fields_from_element_type(pcoll.element_type) + ] + except Exception as exn: + raise ValueError( + "WriteToText requires an input schema with exactly one field.") from exn + if len(field_names) != 1: + raise ValueError( + "WriteToText requires an input schema with exactly one field, got %s" % + field_names) + sole_field_name, = field_names + return pcoll | beam.Map( + lambda x: str(getattr(x, sole_field_name))) | beam.io.WriteToText(path) Review Comment: Should we take a (required?) `file_extension` parameter as well? Right now, this will output files like: `<path>-<shard_id>`, but I'd guess most people will want `<path>-<shard_id>.<extension>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
