I think even easier for other sources. PubSub is a tricky one (for us at least) because Dataflow overrides the Beam native PubSub source with something different. Kafka is a pure Beam source.
On Thu, May 10, 2018 at 1:39 PM Ismaël Mejía <ieme...@gmail.com> wrote: > Hi, Jumping a bit late to this discussion. This sounds super nice. But I > could not access the document. > How hard would it be to do this for other 'unbounded' sources, e.g. Kafka ? > On Sat, May 5, 2018 at 2:56 AM Andrew Pilloud <apill...@google.com> wrote: > > > I don't think we should jump to adding a extension, but TBLPROPERTIES is > already a DDL extension and it isn't user friendly. We should strive for a > world where no one needs to use it. SQL needs the timestamp to be exposed > as a column, we can't hide it without changing the definition of GROUP BY. > I like Anton's proposal of adding it as an annotation in the column > definition. That seems even simpler and more user friendly. We might even > be able to get away with using the PRIMARY KEY keyword. > > > Andrew > > > On Fri, May 4, 2018 at 12:11 PM Anton Kedin <ke...@google.com> wrote: > > >> There are few aspects of the event timestamp definition in SQL, which we > are talking about here: > > >> configuring the source. E.g. for PubsubIO you can choose whether to > extract event timestamp from the message attributes or the message publish > time: > > >> this is source-specific and cannot be part of the common DDL; > >> TBLPROPERTIES, on the other hand, is an opaque json blob which exists > specifically for source configuration; > >> as Kenn is saying, some sources might not even have such configuration; > >> at processing time, event timestamp is available in > ProcessContext.timestamp() regardless of the specifics of the source > configuration, so it can be extracted the same way for all sources, as > Raghu said; > > >> designating one of the table columns as an event timestamp: > > >> query needs to be able to reference the event timestamp so we have to > declare which column to populate with the event timestamp; > >> this is common for all sources and we can create a special syntax, e.g. > "columnName EVENT_TIMESTAMP". It must not contain source-specific > configuration at this point, in my opinion; > >> when SQL knows which column is supposed to be the timestamp, then it can > get it from the ProcessContext.timestamp() and put it into the designated > field the same way regardless of the source configuration; > > >> pubsub-specific message formatting: > > >> on top of the above we want to be able to expose pubsub message > attributes, payload, and timestamp to the user queries, and do it without > magic or user schema modifications. To do this we can enforce some > pubsub-specific schema limitations, e.g. by exposing attributes and > timestamp fields at a top-level schema, with payload going into the second > level in its own field; > >> this aspect is not fully implementable until we have support for complex > types. Until then we cannot map full JSON to the payload field; > > >> I will update the doc and the implementation to reflect these comments > where possible. > > >> Thank you, > >> Anton > > > >> On Fri, May 4, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote: > > >>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote: > > >>>> I think it makes sense for the case when timestamp is provided in the > payload (including pubsub message attributes). We can mark the field as an > event timestamp. But if the timestamp is internally defined by the source > (pubsub message publish time) and not exposed in the event body, then we > need a source-specific mechanism to extract and map the event timestamp to > the schema. This is, of course, if we don't automatically add a magic > timestamp field which Beam SQL can populate behind the scenes and add to > the schema. I want to avoid this magic path for now. > > > >>> Commented on the PR. As Kenn mentioned, every element in Beam has an > event timestamp, there is no requirement to extract the timestamp by the > SQL transform. Using the element timestamp takes care of Pubsub publish > timestamp as well (in fact, this is the default when timestamp attribute is > not specified in PubsubIO). > > >>> How timestamp are customized is specific to each source. That way > custom timestamp option seem like they belong in TBLPROPERTIES. E.g. for > KafkaIO, it could specify "logAppendTime", "createTime", or > "processingTime" etc (though I am not sure how user can provide their own > custom extractor in Beam SQL, may be it could support a timestamp field in > json records). > > >>> Raghu. > > > >>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <apill...@google.com> > wrote: > > >>>>> This sounds awesome! > > >>>>> Is event timestamp something that we need to specify for every > source? If so, I would suggest we add this as a first class option on > CREATE TABLE rather then something hidden in TBLPROPERTIES. > > >>>>> Andrew > > >>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com> > wrote: > > >>>>>> Hi > > >>>>>> I am working on adding functionality to support querying Pubsub > messages directly from Beam SQL. > > >>>>>> Goal > >>>>>> Provide Beam users a pure SQL solution to create the pipelines > with Pubsub as a data source, without the need to set up the pipelines in > Java before applying the query. > > >>>>>> High level approach > > >>>>>> Build on top of PubsubIO; > >>>>>> Pubsub source will be declared using CREATE TABLE DDL statement: > > >>>>>> Beam SQL already supports declaring sources like Kafka and Text > using CREATE TABLE DDL; > >>>>>> it supports additional configuration using TBLPROPERTIES clause. > Currently it takes a text blob, where we can put a JSON configuration; > >>>>>> wrapping PubsubIO into a similar source looks feasible; > > >>>>>> The plan is to initially support messages only with JSON payload: > > >>>>>> more payload formats can be added later; > > >>>>>> Messages will be fully described in the CREATE TABLE statements: > > >>>>>> event timestamps. Source of the timestamp is configurable. It is > required by Beam SQL to have an explicit timestamp column for windowing > support; > >>>>>> messages attributes map; > >>>>>> JSON payload schema; > > >>>>>> Event timestamps will be taken either from publish time or > user-specified message attribute (configurable); > > >>>>>> Thoughts, ideas, comments? > > >>>>>> More details are in the doc here: > > https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE > > >>>>>> Thank you, > >>>>>> Anton >