liferoad commented on code in PR #28595:
URL: https://github.com/apache/beam/pull/28595#discussion_r1334773068
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -123,6 +131,134 @@ def raise_exception(failed_row_with_error):
return WriteToBigQueryHandlingErrors()
+def _create_parser(format, schema):
+ if format == 'raw':
+ if schema:
+ raise ValueError('raw format does not take a schema')
+ return (
+ schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
+ lambda payload: beam.Row(payload=payload))
+ else:
+ raise ValueError(f'Unknown format: {format}')
+
+
+def _create_formatter(format, schema, beam_schema):
+ if format == 'raw':
+ if schema:
+ raise ValueError('raw format does not take a schema')
+ field_names = [field.name for field in beam_schema.fields]
+ if len(field_names) != 1:
+ raise ValueError(f'Expecting exactly one field, found {field_names}')
+ return lambda row: getattr(row, field_names[0])
+ else:
+ raise ValueError(f'Unknown format: {format}')
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def read_from_pubsub(
+ root,
+ *,
+ topic: Optional[str] = None,
+ subscription: Optional[str] = None,
+ format: str,
+ schema: Optional[Any] = None,
+ attributes: Optional[Iterable[str]] = None,
+ attributes_map: Optional[str] = None,
+ timestamp_attribute: Optional[str] = None):
+ if topic and subscription:
+ raise TypeError('Only one of topic and subscription may be specified.')
+ elif not topic and not subscription:
+ raise TypeError('One of topic or subscription may be specified.')
+ payload_schema, parser = _create_parser(format, schema)
+ extra_fields = []
+ if not attributes and not attributes_map:
+ mapper = lambda msg: parser(msg)
+ else:
+ if isinstance(attributes, str):
+ attributes = [attributes]
+ if attributes:
+ extra_fields.extend(
+ [schemas.schema_field(attr, str) for attr in attributes])
+ if attributes_map:
+ extra_fields.append(
+ schemas.schema_field(attributes_map, Mapping[str, str]))
+
+ def mapper(msg):
+ values = parser(msg.data).as_dict()
+ if attributes:
+ # Should missing attributes be optional or parse errors?
+ for attr in attributes:
+ values[attr] = msg.attributes[attr]
+ if attributes_map:
+ values[attributes_map] = msg.attributes
+ return beam.Row(**values)
+
+ output = (
+ root
+ | beam.io.ReadFromPubSub(
+ topic=topic,
+ subscription=subscription,
+ with_attributes=bool(attributes or attributes_map),
+ timestamp_attribute=timestamp_attribute)
+ | 'ParseMessage' >> beam.Map(mapper))
Review Comment:
Thanks. Got it now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]