udim commented on a change in pull request #12076:
URL: https://github.com/apache/beam/pull/12076#discussion_r445257811
##########
File path: sdks/python/apache_beam/transforms/sql_test.py
##########
@@ -157,6 +157,21 @@ def test_zetasql_generate_data(self):
dialect="zetasql")
assert_that(out, equal_to([(1, "foo", 3.14)]))
+ def test_windowing_before_sql(self):
+ with TestPipeline() as p:
+ windowed = (
+ p | beam.Create([
+ SimpleRow(5, "foo", 1.),
+ SimpleRow(15, "bar", 2.),
+ SimpleRow(25, "baz", 3.)
+ ])
+ | beam.Map(lambda v: beam.window.TimestampedValue(v, v.id)).
+ with_output_types(SimpleRow)
Review comment:
I'm not sure there is a more elegant way. Even if you turn that lambda
into a function the output type decorator and actual return value will disagree:
```py
def test_timestamped_value(self):
@beam.typehints.with_input_types(int)
@beam.typehints.with_output_types(int)
def timestamped(e):
return beam.window.TimestampedValue(e, 0)
with TestPipeline() as p:
pcoll = p | beam.Create([1, 2, 3]) | beam.Map(timestamped)
self.assertEqual(int, pcoll.element_type)
```
I prefer the above style to inlining `.with_output_types` so it's clear I'm
not making an exception.
This mismatch is normal in Beam: a DoFn.process()'s return type and type
hint disagree as well. We should probably add functionality to support
annotating the above function like this:
```py
def timestamped(e: int) -> beam.window.TimestampedValue[int]:
return beam.window.TimestampedValue(e, 0)
```
And `pcoll.element_type` will be interpreted as `int`.
Same for `WindowedValues`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]