Hi, I'm using the SqlTransform as an external transform from within a Python pipeline. The SQL docs [1] mention that you can either (a) window the input or (b) window in the SQL query.
Option (a): input | "Window >> beam.WindowInto(window.FixedWindows(30)) | "Aggregate" >> SqlTransform("""Select field, count(field) from PCOLLECTION WHERE ... GROUP BY field """) This results in an exception: Caused by: java.lang.ClassCastException: org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to org.apache.beam.sdk.transforms.windowing.GlobalWindow => Is this a bug? Let's try Option (b): input | "Aggregate & Window" >> SqlTransform("""Select field, count(field) from PCOLLECTION WHERE ... GROUP BY field, TUMBLE(f_timestamp, INTERVAL '30' MINUTE) """) The issue that I'm facing here is that the timestamp is already assigned to my values but is not exposed as a field. So I need to use a DoFn to extract the timestamp as a new field: class GetTimestamp(beam.DoFn): def process(self, event, timestamp=beam.DoFn.TimestampParam): yield TimestampedRow(..., timestamp) input | "Extract timestamp" >> beam.ParDo(GetTimestamp()) | "Aggregate & Window" >> SqlTransform("""Select field, count(field) from PCOLLECTION WHERE ... GROUP BY field, TUMBLE(f_timestamp, INTERVAL '30' MINUTE) """) => It would be very convenient if there was a reserved field name which would point to the timestamp of an element. Maybe there is? -Max [1] https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/