Hi,

I'm using the SqlTransform as an external transform from within a Python
pipeline. The SQL docs [1] mention that you can either (a) window the
input or (b) window in the SQL query.

Option (a):

  input
      | "Window >> beam.WindowInto(window.FixedWindows(30))
      | "Aggregate" >>
      SqlTransform("""Select field, count(field) from PCOLLECTION
                      WHERE ...
                      GROUP BY field
                   """)

This results in an exception:

  Caused by: java.lang.ClassCastException:
  org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast
  to org.apache.beam.sdk.transforms.windowing.GlobalWindow

=> Is this a bug?


Let's try Option (b):

  input
      | "Aggregate & Window" >>
      SqlTransform("""Select field, count(field) from PCOLLECTION
                      WHERE ...
                      GROUP BY field,
                               TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
                   """)

The issue that I'm facing here is that the timestamp is already assigned
to my values but is not exposed as a field. So I need to use a DoFn to
extract the timestamp as a new field:

  class GetTimestamp(beam.DoFn):
    def process(self, event, timestamp=beam.DoFn.TimestampParam):
      yield TimestampedRow(..., timestamp)

  input
      | "Extract timestamp" >>
      beam.ParDo(GetTimestamp())
      | "Aggregate & Window" >>
      SqlTransform("""Select field, count(field) from PCOLLECTION
                      WHERE ...
                      GROUP BY field,
                               TUMBLE(f_timestamp, INTERVAL '30' MINUTE)
                   """)

=> It would be very convenient if there was a reserved field name which
would point to the timestamp of an element. Maybe there is?


-Max


[1]
https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/

Reply via email to