Copilot commented on code in PR #38985:
URL: https://github.com/apache/beam/pull/38985#discussion_r3422650728
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -366,14 +368,16 @@ def read_from_pubsub(
``2015-10-29T23:41:41.123Z``. The sub-second component of the
timestamp is optional, and digits beyond the first three (i.e., time
units smaller than milliseconds) may be ignored.
+ publish_time_field: Field to add to output messages with the Pub/Sub
+ message publish time. If None, no such field is added.
"""
if topic and subscription:
raise TypeError('Only one of topic and subscription may be specified.')
elif not topic and not subscription:
raise TypeError('One of topic or subscription may be specified.')
payload_schema, parser = _create_parser(format, schema)
extra_fields: list[schema_pb2.Field] = []
- if not attributes and not attributes_map:
+ if not attributes and not attributes_map and not publish_time_field:
Review Comment:
The docstring specifies that only `None` disables this feature, but the
implementation uses truthiness checks (`if publish_time_field:` / `not
publish_time_field`), which will also treat an empty string as disabled and
silently skip adding the field. Prefer explicitly checking `publish_time_field
is None` vs non-None, and raise a clear error for empty/whitespace-only field
names to avoid unexpected no-op configurations.
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -366,14 +368,16 @@ def read_from_pubsub(
``2015-10-29T23:41:41.123Z``. The sub-second component of the
timestamp is optional, and digits beyond the first three (i.e., time
units smaller than milliseconds) may be ignored.
+ publish_time_field: Field to add to output messages with the Pub/Sub
+ message publish time. If None, no such field is added.
Review Comment:
The docstring specifies that only `None` disables this feature, but the
implementation uses truthiness checks (`if publish_time_field:` / `not
publish_time_field`), which will also treat an empty string as disabled and
silently skip adding the field. Prefer explicitly checking `publish_time_field
is None` vs non-None, and raise a clear error for empty/whitespace-only field
names to avoid unexpected no-op configurations.
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -393,14 +399,18 @@ def mapper(msg):
values[attr] = msg.attributes[attr]
if attributes_map:
values[attributes_map] = msg.attributes
+ if publish_time_field:
+ values[publish_time_field] = Timestamp.from_utc_datetime(
+ msg.publish_time)
Review Comment:
The docstring specifies that only `None` disables this feature, but the
implementation uses truthiness checks (`if publish_time_field:` / `not
publish_time_field`), which will also treat an empty string as disabled and
silently skip adding the field. Prefer explicitly checking `publish_time_field
is None` vs non-None, and raise a clear error for empty/whitespace-only field
names to avoid unexpected no-op configurations.
##########
sdks/python/apache_beam/yaml/yaml_io_test.py:
##########
@@ -181,6 +183,40 @@ def test_read_with_attribute_map(self):
beam.Row(payload=b'msg2', attrMap={'attr': 'value2'})
]))
+ def test_read_with_publish_time_field(self):
+ publish_time_1 = datetime.datetime(
+ 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc)
+ publish_time_2 = datetime.datetime(
+ 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc)
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ with mock.patch('apache_beam.io.ReadFromPubSub',
+ FakeReadFromPubSub(
+ topic='my_topic',
+ messages=[PubsubMessage(b'msg1', {'attr': 'value1'},
+ publish_time=publish_time_1),
+ PubsubMessage(b'msg2', {'attr': 'value2'},
+ publish_time=publish_time_2)
+ ])):
+ result = p | YamlTransform(
+ '''
+ type: ReadFromPubSub
+ config:
+ topic: my_topic
+ format: RAW
+ publish_time_field: publish_time
+ ''')
Review Comment:
This test covers the `publish_time_field`-only path, but the updated
implementation also supports combining `publish_time_field` with `attributes`
and/or `attributes_map` in the same output row. Add at least one test that sets
`publish_time_field` alongside `attributes_map` (and/or `attributes`) to verify
the schema and produced rows include both the attribute-derived fields and the
publish time field.
##########
sdks/python/apache_beam/yaml/yaml_io_test.py:
##########
@@ -181,6 +183,40 @@ def test_read_with_attribute_map(self):
beam.Row(payload=b'msg2', attrMap={'attr': 'value2'})
]))
+ def test_read_with_publish_time_field(self):
+ publish_time_1 = datetime.datetime(
+ 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc)
+ publish_time_2 = datetime.datetime(
+ 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc)
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
Review Comment:
This test covers the `publish_time_field`-only path, but the updated
implementation also supports combining `publish_time_field` with `attributes`
and/or `attributes_map` in the same output row. Add at least one test that sets
`publish_time_field` alongside `attributes_map` (and/or `attributes`) to verify
the schema and produced rows include both the attribute-derived fields and the
publish time field.
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -384,6 +388,8 @@ def read_from_pubsub(
if attributes_map:
extra_fields.append(
schemas.schema_field(attributes_map, Mapping[str, str]))
+ if publish_time_field:
+ extra_fields.append(schemas.schema_field(publish_time_field, Timestamp))
Review Comment:
The docstring specifies that only `None` disables this feature, but the
implementation uses truthiness checks (`if publish_time_field:` / `not
publish_time_field`), which will also treat an empty string as disabled and
silently skip adding the field. Prefer explicitly checking `publish_time_field
is None` vs non-None, and raise a clear error for empty/whitespace-only field
names to avoid unexpected no-op configurations.
##########
sdks/python/apache_beam/yaml/yaml_io_test.py:
##########
@@ -181,6 +183,40 @@ def test_read_with_attribute_map(self):
beam.Row(payload=b'msg2', attrMap={'attr': 'value2'})
]))
+ def test_read_with_publish_time_field(self):
Review Comment:
This test covers the `publish_time_field`-only path, but the updated
implementation also supports combining `publish_time_field` with `attributes`
and/or `attributes_map` in the same output row. Add at least one test that sets
`publish_time_field` alongside `attributes_map` (and/or `attributes`) to verify
the schema and produced rows include both the attribute-derived fields and the
publish time field.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]