Thanks for the quick reply Brian! I've filed a JIRA for option (a):
https://jira.apache.org/jira/browse/BEAM-10143

Makes sense to define DATETIME as a logical type. I'll check out your
PR. We could work around this for now by doing a cast, e.g.:

  TUMBLE(CAST(f_timestamp AS DATETIME), INTERVAL '30' MINUTE)

Note that we may have to do a more sophisticated cast to convert the
Python micros into a DATETIME.

-Max

On 28.05.20 19:18, Brian Hulette wrote:
> Hey Max,
> Thanks for kicking the tires on SqlTransform in Python :)
> 
> We don't have any tests of windowing and Sql in Python yet, so I'm not
> that surprised you're running into issues here. Portable schemas don't
> support the DATETIME type, because we decided not to define it as one of
> the atomic types [1] and hope to add support via a logical type instead
> (see BEAM-7554 [2]). This was the motivation for the MillisInstant PR I
> put up, and the ongoing discussion [3].
> Regardless, that should only be an obstacle for option (b), where you'd
> need to have a DATETIME in the input and/or output PCollection of the
> SqlTransform. In theory option (a) should be possible, so I'd consider
> that a bug - can you file a jira for it?
> 
> Brian
> 
> [1] 
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto#L58
> [2] https://issues.apache.org/jira/browse/BEAM-7554
> [3] 
> https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E
> 
> On Thu, May 28, 2020 at 9:45 AM Maximilian Michels <m...@apache.org
> <mailto:m...@apache.org>> wrote:
> 
>     Hi,
> 
>     I'm using the SqlTransform as an external transform from within a Python
>     pipeline. The SQL docs [1] mention that you can either (a) window the
>     input or (b) window in the SQL query.
> 
>     Option (a):
> 
>       input
>           | "Window >> beam.WindowInto(window.FixedWindows(30))
>           | "Aggregate" >>
>           SqlTransform("""Select field, count(field) from PCOLLECTION
>                           WHERE ...
>                           GROUP BY field
>                        """)
> 
>     This results in an exception:
> 
>       Caused by: java.lang.ClassCastException:
>       org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
>       to org.apache.beam.sdk.transforms.windowing.GlobalWindow
> 
>     => Is this a bug?
> 
> 
>     Let's try Option (b):
> 
>       input
>           | "Aggregate & Window" >>
>           SqlTransform("""Select field, count(field) from PCOLLECTION
>                           WHERE ...
>                           GROUP BY field,
>                                    TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                        """)
> 
>     The issue that I'm facing here is that the timestamp is already assigned
>     to my values but is not exposed as a field. So I need to use a DoFn to
>     extract the timestamp as a new field:
> 
>       class GetTimestamp(beam.DoFn):
>         def process(self, event, timestamp=beam.DoFn.TimestampParam):
>           yield TimestampedRow(..., timestamp)
> 
>       input
>           | "Extract timestamp" >>
>           beam.ParDo(GetTimestamp())
>           | "Aggregate & Window" >>
>           SqlTransform("""Select field, count(field) from PCOLLECTION
>                           WHERE ...
>                           GROUP BY field,
>                                    TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
>                        """)
> 
>     => It would be very convenient if there was a reserved field name which
>     would point to the timestamp of an element. Maybe there is?
> 
> 
>     -Max
> 
> 
>     [1]
>     
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
> 

Reply via email to