Hey Max,
Thanks for kicking the tires on SqlTransform in Python :)

We don't have any tests of windowing and Sql in Python yet, so I'm not that
surprised you're running into issues here. Portable schemas don't support
the DATETIME type, because we decided not to define it as one of the atomic
types [1] and hope to add support via a logical type instead (see BEAM-7554
[2]). This was the motivation for the MillisInstant PR I put up, and the
ongoing discussion [3].
Regardless, that should only be an obstacle for option (b), where you'd
need to have a DATETIME in the input and/or output PCollection of the
SqlTransform. In theory option (a) should be possible, so I'd consider that
a bug - can you file a jira for it?

Brian

[1]
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto#L58
[2] https://issues.apache.org/jira/browse/BEAM-7554
[3]
https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E

On Thu, May 28, 2020 at 9:45 AM Maximilian Michels <[email protected]> wrote:

> Hi,
>
> I'm using the SqlTransform as an external transform from within a Python
> pipeline. The SQL docs [1] mention that you can either (a) window the
> input or (b) window in the SQL query.
>
> Option (a):
>
>   input
>       | "Window >> beam.WindowInto(window.FixedWindows(30))
>       | "Aggregate" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field
>                    """)
>
> This results in an exception:
>
>   Caused by: java.lang.ClassCastException:
>   org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
>   to org.apache.beam.sdk.transforms.windowing.GlobalWindow
>
> => Is this a bug?
>
>
> Let's try Option (b):
>
>   input
>       | "Aggregate & Window" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field,
>                                TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                    """)
>
> The issue that I'm facing here is that the timestamp is already assigned
> to my values but is not exposed as a field. So I need to use a DoFn to
> extract the timestamp as a new field:
>
>   class GetTimestamp(beam.DoFn):
>     def process(self, event, timestamp=beam.DoFn.TimestampParam):
>       yield TimestampedRow(..., timestamp)
>
>   input
>       | "Extract timestamp" >>
>       beam.ParDo(GetTimestamp())
>       | "Aggregate & Window" >>
>       SqlTransform("""Select field, count(field) from PCOLLECTION
>                       WHERE ...
>                       GROUP BY field,
>                                TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                    """)
>
> => It would be very convenient if there was a reserved field name which
> would point to the timestamp of an element. Maybe there is?
>
>
> -Max
>
>
> [1]
>
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
>

Reply via email to