robertwb commented on code in PR #28595:
URL: https://github.com/apache/beam/pull/28595#discussion_r1334728490


##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -123,6 +131,134 @@ def raise_exception(failed_row_with_error):
   return WriteToBigQueryHandlingErrors()
 
 
+def _create_parser(format, schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    return (
+        schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
+        lambda payload: beam.Row(payload=payload))
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
+def _create_formatter(format, schema, beam_schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    field_names = [field.name for field in beam_schema.fields]
+    if len(field_names) != 1:
+      raise ValueError(f'Expecting exactly one field, found {field_names}')
+    return lambda row: getattr(row, field_names[0])
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def read_from_pubsub(
+    root,
+    *,
+    topic: Optional[str] = None,
+    subscription: Optional[str] = None,
+    format: str,
+    schema: Optional[Any] = None,
+    attributes: Optional[Iterable[str]] = None,
+    attributes_map: Optional[str] = None,
+    timestamp_attribute: Optional[str] = None):

Review Comment:
   These docstrings aren't really exposed anywhere, but I'd like to do so 
eventually, and they could serve as good internal documentation as well. Done. 



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -123,6 +131,134 @@ def raise_exception(failed_row_with_error):
   return WriteToBigQueryHandlingErrors()
 
 
+def _create_parser(format, schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    return (
+        schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
+        lambda payload: beam.Row(payload=payload))
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
+def _create_formatter(format, schema, beam_schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    field_names = [field.name for field in beam_schema.fields]
+    if len(field_names) != 1:
+      raise ValueError(f'Expecting exactly one field, found {field_names}')
+    return lambda row: getattr(row, field_names[0])
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def read_from_pubsub(
+    root,
+    *,
+    topic: Optional[str] = None,
+    subscription: Optional[str] = None,
+    format: str,
+    schema: Optional[Any] = None,
+    attributes: Optional[Iterable[str]] = None,
+    attributes_map: Optional[str] = None,
+    timestamp_attribute: Optional[str] = None):
+  if topic and subscription:
+    raise TypeError('Only one of topic and subscription may be specified.')
+  elif not topic and not subscription:
+    raise TypeError('One of topic or subscription may be specified.')
+  payload_schema, parser = _create_parser(format, schema)
+  extra_fields = []
+  if not attributes and not attributes_map:
+    mapper = lambda msg: parser(msg)
+  else:
+    if isinstance(attributes, str):
+      attributes = [attributes]
+    if attributes:
+      extra_fields.extend(
+          [schemas.schema_field(attr, str) for attr in attributes])
+    if attributes_map:
+      extra_fields.append(
+          schemas.schema_field(attributes_map, Mapping[str, str]))
+
+    def mapper(msg):
+      values = parser(msg.data).as_dict()
+      if attributes:
+        # Should missing attributes be optional or parse errors?
+        for attr in attributes:
+          values[attr] = msg.attributes[attr]
+      if attributes_map:
+        values[attributes_map] = msg.attributes
+      return beam.Row(**values)
+
+  output = (
+      root
+      | beam.io.ReadFromPubSub(
+          topic=topic,
+          subscription=subscription,
+          with_attributes=bool(attributes or attributes_map),
+          timestamp_attribute=timestamp_attribute)
+      | 'ParseMessage' >> beam.Map(mapper))
+  output.element_type = schemas.named_tuple_from_schema(
+      schema_pb2.Schema(fields=list(payload_schema.fields) + extra_fields))
+  return output
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def write_to_pubsub(
+    pcoll,
+    *,
+    topic: str,
+    format: str,
+    schema: Optional[Any] = None,
+    attributes: Optional[Iterable[str]] = None,
+    attributes_map: Optional[str] = None,
+    timestamp_attribute: Optional[str] = None):
+
+  input_schema = schemas.schema_from_element_type(pcoll.element_type)
+
+  extra_fields = []
+  if isinstance(attributes, str):
+    attributes = [attributes]
+  if attributes:
+    extra_fields.extend(attributes)
+  if attributes_map:
+    extra_fields.append(attributes_map)
+
+  def attributes_extractor(row):
+    if attributes_map:
+      attribute_values = dict(getattr(row, attributes_map))
+    else:
+      attribute_values = {}
+    if attributes:
+      attribute_values.update({attr: getattr(row, attr) for attr in 
attributes})
+    return attribute_values
+
+  schema_names = set(f.name for f in input_schema.fields)
+  missing_attribute_names = set(extra_fields) - schema_names
+  if missing_attribute_names:
+    raise ValueError(
+        f'Attribute fields {missing_attribute_names} '
+        f'not found in schema fields {schema_names}')
+
+  payload_schema = schema_pb2.Schema(
+      fields=[
+          field for field in input_schema.fields
+          if field.name not in extra_fields
+      ])
+  formatter = _create_formatter(format, schema, payload_schema)
+  return (
+      pcoll | beam.Map(
+          lambda row: beam.io.gcp.pubsub.PubsubMessage(
+              formatter(row), attributes_extractor(row)))
+      | beam.io.WriteToPubSub(
+          topic, with_attributes=True, 
timestamp_attribute=timestamp_attribute))

Review Comment:
   Done.



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -123,6 +131,134 @@ def raise_exception(failed_row_with_error):
   return WriteToBigQueryHandlingErrors()
 
 
+def _create_parser(format, schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    return (
+        schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
+        lambda payload: beam.Row(payload=payload))
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
+def _create_formatter(format, schema, beam_schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    field_names = [field.name for field in beam_schema.fields]
+    if len(field_names) != 1:
+      raise ValueError(f'Expecting exactly one field, found {field_names}')
+    return lambda row: getattr(row, field_names[0])
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def read_from_pubsub(
+    root,
+    *,
+    topic: Optional[str] = None,
+    subscription: Optional[str] = None,
+    format: str,
+    schema: Optional[Any] = None,
+    attributes: Optional[Iterable[str]] = None,
+    attributes_map: Optional[str] = None,
+    timestamp_attribute: Optional[str] = None):
+  if topic and subscription:
+    raise TypeError('Only one of topic and subscription may be specified.')
+  elif not topic and not subscription:
+    raise TypeError('One of topic or subscription may be specified.')
+  payload_schema, parser = _create_parser(format, schema)
+  extra_fields = []
+  if not attributes and not attributes_map:
+    mapper = lambda msg: parser(msg)
+  else:
+    if isinstance(attributes, str):
+      attributes = [attributes]
+    if attributes:
+      extra_fields.extend(
+          [schemas.schema_field(attr, str) for attr in attributes])
+    if attributes_map:
+      extra_fields.append(
+          schemas.schema_field(attributes_map, Mapping[str, str]))
+
+    def mapper(msg):
+      values = parser(msg.data).as_dict()
+      if attributes:
+        # Should missing attributes be optional or parse errors?
+        for attr in attributes:
+          values[attr] = msg.attributes[attr]
+      if attributes_map:
+        values[attributes_map] = msg.attributes
+      return beam.Row(**values)
+
+  output = (
+      root
+      | beam.io.ReadFromPubSub(
+          topic=topic,
+          subscription=subscription,
+          with_attributes=bool(attributes or attributes_map),
+          timestamp_attribute=timestamp_attribute)
+      | 'ParseMessage' >> beam.Map(mapper))

Review Comment:
   No, as this is a subtransform of this (uniquely named) composite read 
operation. (It's full name will be 
[qualified-name-of-outer-operation]/ParseMessage



##########
sdks/python/apache_beam/yaml/yaml_io_test.py:
##########
@@ -0,0 +1,201 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import logging
+import unittest
+
+import mock
+
+import apache_beam as beam
+from apache_beam.io.gcp.pubsub import PubsubMessage
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import AssertThat
+from apache_beam.testing.util import equal_to
+from apache_beam.yaml import yaml_provider
+from apache_beam.yaml.yaml_transform import YamlTransform
+
+
+class FakeReadFromPubSub:

Review Comment:
   The full testing story is still being worked on (though I would hope we can 
trust the underlying PubSub classes to be sufficiently tested and correct and 
not have to write integration tests for every use of them). 
   
   (Personally, I generally avoid mocks, but the overhead of actually spinning 
up and testing against production pubsub is fairly heavyweight and not needed 
to exercise the new codepaths here...)



##########
sdks/python/apache_beam/yaml/standard_io.yaml:
##########
@@ -53,6 +53,8 @@
     # 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery'
     'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text'
     'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'
+    'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub'

Review Comment:
   Modules named io are painful because there's a top-level module named io 
already, as well as beam.io. 



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -123,6 +131,134 @@ def raise_exception(failed_row_with_error):
   return WriteToBigQueryHandlingErrors()
 
 
+def _create_parser(format, schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    return (
+        schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
+        lambda payload: beam.Row(payload=payload))
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
+def _create_formatter(format, schema, beam_schema):
+  if format == 'raw':
+    if schema:
+      raise ValueError('raw format does not take a schema')
+    field_names = [field.name for field in beam_schema.fields]
+    if len(field_names) != 1:
+      raise ValueError(f'Expecting exactly one field, found {field_names}')
+    return lambda row: getattr(row, field_names[0])
+  else:
+    raise ValueError(f'Unknown format: {format}')
+
+
[email protected]_fn
+@yaml_mapping.maybe_with_exception_handling_transform_fn
+def read_from_pubsub(
+    root,
+    *,
+    topic: Optional[str] = None,
+    subscription: Optional[str] = None,
+    format: str,
+    schema: Optional[Any] = None,
+    attributes: Optional[Iterable[str]] = None,
+    attributes_map: Optional[str] = None,
+    timestamp_attribute: Optional[str] = None):
+  if topic and subscription:
+    raise TypeError('Only one of topic and subscription may be specified.')
+  elif not topic and not subscription:
+    raise TypeError('One of topic or subscription may be specified.')
+  payload_schema, parser = _create_parser(format, schema)
+  extra_fields = []
+  if not attributes and not attributes_map:
+    mapper = lambda msg: parser(msg)
+  else:
+    if isinstance(attributes, str):
+      attributes = [attributes]
+    if attributes:
+      extra_fields.extend(
+          [schemas.schema_field(attr, str) for attr in attributes])
+    if attributes_map:
+      extra_fields.append(
+          schemas.schema_field(attributes_map, Mapping[str, str]))
+
+    def mapper(msg):
+      values = parser(msg.data).as_dict()
+      if attributes:
+        # Should missing attributes be optional or parse errors?
+        for attr in attributes:
+          values[attr] = msg.attributes[attr]
+      if attributes_map:
+        values[attributes_map] = msg.attributes
+      return beam.Row(**values)
+
+  output = (
+      root
+      | beam.io.ReadFromPubSub(
+          topic=topic,

Review Comment:
   No. I've added as id_attribute (for consistency with Java and that's a 
better name.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to