I think even easier for other sources. PubSub is a tricky one (for us at
least) because Dataflow overrides the Beam native PubSub source with
something different. Kafka is a pure Beam source.

On Thu, May 10, 2018 at 1:39 PM Ismaël Mejía <ieme...@gmail.com> wrote:

> Hi, Jumping a bit late to this discussion. This sounds super nice. But I
> could not access the document.
> How hard would it be to do this for other 'unbounded' sources, e.g. Kafka ?
> On Sat, May 5, 2018 at 2:56 AM Andrew Pilloud <apill...@google.com> wrote:
>
> > I don't think we should jump to adding a extension, but TBLPROPERTIES is
> already a DDL extension and it isn't user friendly. We should strive for a
> world where no one needs to use it. SQL needs the timestamp to be exposed
> as a column, we can't hide it without changing the definition of GROUP BY.
> I like Anton's proposal of adding it as an annotation in the column
> definition. That seems even simpler and more user friendly. We might even
> be able to get away with using the PRIMARY KEY keyword.
>
> > Andrew
>
> > On Fri, May 4, 2018 at 12:11 PM Anton Kedin <ke...@google.com> wrote:
>
> >> There are few aspects of the event timestamp definition in SQL, which we
> are talking about here:
>
> >> configuring the source. E.g. for PubsubIO you can choose whether to
> extract event timestamp from the message attributes or the message publish
> time:
>
> >> this is source-specific and cannot be part of the common DDL;
> >> TBLPROPERTIES, on the other hand, is an opaque json blob which exists
> specifically for source configuration;
> >> as Kenn is saying, some sources might not even have such configuration;
> >> at processing time, event timestamp is available in
> ProcessContext.timestamp() regardless of the specifics of the source
> configuration, so it can be extracted the same way for all sources, as
> Raghu said;
>
> >> designating one of the table columns as an event timestamp:
>
> >> query needs to be able to reference the event timestamp so we have to
> declare which column to populate with the event timestamp;
> >> this is common for all sources and we can create a special syntax, e.g.
> "columnName EVENT_TIMESTAMP". It must not contain source-specific
> configuration at this point, in my opinion;
> >> when SQL knows which column is supposed to be the timestamp, then it can
> get it from the ProcessContext.timestamp() and put it into the designated
> field the same way regardless of the source configuration;
>
> >> pubsub-specific message formatting:
>
> >> on top of the above we want to be able to expose pubsub message
> attributes, payload, and timestamp to the user queries, and do it without
> magic or user schema modifications. To do this we can enforce some
> pubsub-specific schema limitations, e.g. by exposing attributes and
> timestamp fields at a top-level schema, with payload going into the second
> level in its own field;
> >> this aspect is not fully implementable until we have support for complex
> types. Until then we cannot map full JSON to the payload field;
>
> >> I will update the doc and the implementation to reflect these comments
> where possible.
>
> >> Thank you,
> >> Anton
>
>
> >> On Fri, May 4, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote:
>
> >>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <ke...@google.com> wrote:
>
> >>>> I think it makes sense for the case when timestamp is provided in the
> payload (including pubsub message attributes).  We can mark the field as an
> event timestamp. But if the timestamp is internally defined by the source
> (pubsub message publish time) and not exposed in the event body, then we
> need a source-specific mechanism to extract and map the event timestamp to
> the schema. This is, of course, if we don't automatically add a magic
> timestamp field which Beam SQL can populate behind the scenes and add to
> the schema. I want to avoid this magic path for now.
>
>
> >>> Commented on the PR. As Kenn mentioned, every element in Beam has an
> event timestamp, there is no requirement to extract the timestamp by the
> SQL transform. Using the element timestamp takes care of Pubsub publish
> timestamp as well (in fact, this is the default when timestamp attribute is
> not specified in PubsubIO).
>
> >>> How timestamp are customized is specific to each source. That way
> custom timestamp option seem like they belong in TBLPROPERTIES. E.g. for
> KafkaIO, it could specify "logAppendTime", "createTime", or
> "processingTime" etc (though I am not sure how user can provide their own
> custom extractor in Beam SQL, may be it could support a timestamp field in
> json records).
>
> >>> Raghu.
>
>
> >>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <apill...@google.com>
> wrote:
>
> >>>>> This sounds awesome!
>
> >>>>> Is event timestamp something that we need to specify for every
> source? If so, I would suggest we add this as a first class option on
> CREATE TABLE rather then something hidden in TBLPROPERTIES.
>
> >>>>> Andrew
>
> >>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <ke...@google.com>
> wrote:
>
> >>>>>> Hi
>
> >>>>>> I am working on adding functionality to support querying Pubsub
> messages directly from Beam SQL.
>
> >>>>>> Goal
> >>>>>>    Provide Beam users a pure  SQL solution to create the pipelines
> with Pubsub as a data source, without the need to set up the pipelines in
> Java before applying the query.
>
> >>>>>> High level approach
>
> >>>>>> Build on top of PubsubIO;
> >>>>>> Pubsub source will be declared using CREATE TABLE DDL statement:
>
> >>>>>> Beam SQL already supports declaring sources like Kafka and Text
> using CREATE TABLE DDL;
> >>>>>> it supports additional configuration using TBLPROPERTIES clause.
> Currently it takes a text blob, where we can put a JSON configuration;
> >>>>>> wrapping PubsubIO into a similar source looks feasible;
>
> >>>>>> The plan is to initially support messages only with JSON payload:
>
> >>>>>> more payload formats can be added later;
>
> >>>>>> Messages will be fully described in the CREATE TABLE statements:
>
> >>>>>> event timestamps. Source of the timestamp is configurable. It is
> required by Beam SQL to have an explicit timestamp column for windowing
> support;
> >>>>>> messages attributes map;
> >>>>>> JSON payload schema;
>
> >>>>>> Event timestamps will be taken either from publish time or
> user-specified message attribute (configurable);
>
> >>>>>> Thoughts, ideas, comments?
>
> >>>>>> More details are in the doc here:
>
> https://docs.google.com/document/d/1wIXTxh-nQ3u694XbF0iEZX_7-b3yi4ad0ML2pcAxYfE
>
> >>>>>> Thank you,
> >>>>>> Anton
>

Reply via email to