liferoad commented on code in PR #28595:
URL: https://github.com/apache/beam/pull/28595#discussion_r1334286924


##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -123,6 +131,134 @@ def raise_exception(failed_row_with_error):
   return WriteToBigQueryHandlingErrors()
 
 
+def _create_parser(format, schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    return (
+        schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
+        lambda payload: beam.Row(payload=payload))
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
+def _create_formatter(format, schema, beam_schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    field_names = [field.name for field in beam_schema.fields]
+    if len(field_names) != 1:
+      raise ValueError(f'Expecting exactly one field, found {field_names}')
+    return lambda row: getattr(row, field_names[0])
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def read_from_pubsub(
+    root,
+    *,
+    topic: Optional[str] = None,
+    subscription: Optional[str] = None,
+    format: str,
+    schema: Optional[Any] = None,
+    attributes: Optional[Iterable[str]] = None,
+    attributes_map: Optional[str] = None,
+    timestamp_attribute: Optional[str] = None):

Review Comment:
   Do we need docstrings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to