Thanks for the quick reply Brian! I've filed a JIRA for option (a): https://jira.apache.org/jira/browse/BEAM-10143
Makes sense to define DATETIME as a logical type. I'll check out your PR. We could work around this for now by doing a cast, e.g.: TUMBLE(CAST(f_timestamp AS DATETIME), INTERVAL '30' MINUTE) Note that we may have to do a more sophisticated cast to convert the Python micros into a DATETIME. -Max On 28.05.20 19:18, Brian Hulette wrote: > Hey Max, > Thanks for kicking the tires on SqlTransform in Python :) > > We don't have any tests of windowing and Sql in Python yet, so I'm not > that surprised you're running into issues here. Portable schemas don't > support the DATETIME type, because we decided not to define it as one of > the atomic types [1] and hope to add support via a logical type instead > (see BEAM-7554 [2]). This was the motivation for the MillisInstant PR I > put up, and the ongoing discussion [3]. > Regardless, that should only be an obstacle for option (b), where you'd > need to have a DATETIME in the input and/or output PCollection of the > SqlTransform. In theory option (a) should be possible, so I'd consider > that a bug - can you file a jira for it? > > Brian > > [1] > https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto#L58 > [2] https://issues.apache.org/jira/browse/BEAM-7554 > [3] > https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E > > On Thu, May 28, 2020 at 9:45 AM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > Hi, > > I'm using the SqlTransform as an external transform from within a Python > pipeline. The SQL docs [1] mention that you can either (a) window the > input or (b) window in the SQL query. > > Option (a): > > input > | "Window >> beam.WindowInto(window.FixedWindows(30)) > | "Aggregate" >> > SqlTransform("""Select field, count(field) from PCOLLECTION > WHERE ... > GROUP BY field > """) > > This results in an exception: > > Caused by: java.lang.ClassCastException: > org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast > to org.apache.beam.sdk.transforms.windowing.GlobalWindow > > => Is this a bug? > > > Let's try Option (b): > > input > | "Aggregate & Window" >> > SqlTransform("""Select field, count(field) from PCOLLECTION > WHERE ... > GROUP BY field, > TUMBLE(f_timestamp, INTERVAL '30' MINUTE) > """) > > The issue that I'm facing here is that the timestamp is already assigned > to my values but is not exposed as a field. So I need to use a DoFn to > extract the timestamp as a new field: > > class GetTimestamp(beam.DoFn): > def process(self, event, timestamp=beam.DoFn.TimestampParam): > yield TimestampedRow(..., timestamp) > > input > | "Extract timestamp" >> > beam.ParDo(GetTimestamp()) > | "Aggregate & Window" >> > SqlTransform("""Select field, count(field) from PCOLLECTION > WHERE ... > GROUP BY field, > TUMBLE(f_timestamp, INTERVAL '30' MINUTE) > """) > > => It would be very convenient if there was a reserved field name which > would point to the timestamp of an element. Maybe there is? > > > -Max > > > [1] > > https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/ >