liferoad commented on code in PR #28595:
URL: https://github.com/apache/beam/pull/28595#discussion_r1334292195
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -123,6 +131,134 @@ def raise_exception(failed_row_with_error):
return WriteToBigQueryHandlingErrors()
+def _create_parser(format, schema):
+ if format == 'raw':
+ if schema:
+ raise ValueError('raw format does not take a schema')
+ return (
+ schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
+ lambda payload: beam.Row(payload=payload))
+ else:
+ raise ValueError(f'Unknown format: {format}')
+
+
+def _create_formatter(format, schema, beam_schema):
+ if format == 'raw':
+ if schema:
+ raise ValueError('raw format does not take a schema')
+ field_names = [field.name for field in beam_schema.fields]
+ if len(field_names) != 1:
+ raise ValueError(f'Expecting exactly one field, found {field_names}')
+ return lambda row: getattr(row, field_names[0])
+ else:
+ raise ValueError(f'Unknown format: {format}')
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def read_from_pubsub(
+ root,
+ *,
+ topic: Optional[str] = None,
+ subscription: Optional[str] = None,
+ format: str,
+ schema: Optional[Any] = None,
+ attributes: Optional[Iterable[str]] = None,
+ attributes_map: Optional[str] = None,
+ timestamp_attribute: Optional[str] = None):
+ if topic and subscription:
+ raise TypeError('Only one of topic and subscription may be specified.')
+ elif not topic and not subscription:
+ raise TypeError('One of topic or subscription may be specified.')
+ payload_schema, parser = _create_parser(format, schema)
+ extra_fields = []
+ if not attributes and not attributes_map:
+ mapper = lambda msg: parser(msg)
+ else:
+ if isinstance(attributes, str):
+ attributes = [attributes]
+ if attributes:
+ extra_fields.extend(
+ [schemas.schema_field(attr, str) for attr in attributes])
+ if attributes_map:
+ extra_fields.append(
+ schemas.schema_field(attributes_map, Mapping[str, str]))
+
+ def mapper(msg):
+ values = parser(msg.data).as_dict()
+ if attributes:
+ # Should missing attributes be optional or parse errors?
+ for attr in attributes:
+ values[attr] = msg.attributes[attr]
+ if attributes_map:
+ values[attributes_map] = msg.attributes
+ return beam.Row(**values)
+
+ output = (
+ root
+ | beam.io.ReadFromPubSub(
+ topic=topic,
+ subscription=subscription,
+ with_attributes=bool(attributes or attributes_map),
+ timestamp_attribute=timestamp_attribute)
+ | 'ParseMessage' >> beam.Map(mapper))
+ output.element_type = schemas.named_tuple_from_schema(
+ schema_pb2.Schema(fields=list(payload_schema.fields) + extra_fields))
+ return output
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def write_to_pubsub(
+ pcoll,
+ *,
+ topic: str,
+ format: str,
+ schema: Optional[Any] = None,
+ attributes: Optional[Iterable[str]] = None,
+ attributes_map: Optional[str] = None,
+ timestamp_attribute: Optional[str] = None):
+
+ input_schema = schemas.schema_from_element_type(pcoll.element_type)
+
+ extra_fields = []
+ if isinstance(attributes, str):
+ attributes = [attributes]
+ if attributes:
+ extra_fields.extend(attributes)
+ if attributes_map:
+ extra_fields.append(attributes_map)
+
+ def attributes_extractor(row):
+ if attributes_map:
+ attribute_values = dict(getattr(row, attributes_map))
+ else:
+ attribute_values = {}
+ if attributes:
+ attribute_values.update({attr: getattr(row, attr) for attr in
attributes})
+ return attribute_values
+
+ schema_names = set(f.name for f in input_schema.fields)
+ missing_attribute_names = set(extra_fields) - schema_names
+ if missing_attribute_names:
+ raise ValueError(
+ f'Attribute fields {missing_attribute_names} '
+ f'not found in schema fields {schema_names}')
+
+ payload_schema = schema_pb2.Schema(
+ fields=[
+ field for field in input_schema.fields
+ if field.name not in extra_fields
+ ])
+ formatter = _create_formatter(format, schema, payload_schema)
+ return (
+ pcoll | beam.Map(
+ lambda row: beam.io.gcp.pubsub.PubsubMessage(
+ formatter(row), attributes_extractor(row)))
+ | beam.io.WriteToPubSub(
+ topic, with_attributes=True,
timestamp_attribute=timestamp_attribute))
Review Comment:
similarly, id_label?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]