Thanks for the replies.
Regarding the ""It might be sometimes that this is not explicit to be guessed"
That is
why I added the RelTimeConverter. After this conversion step it should
be as explicit as possible (by using the special types). And we can add
special handling of functions (i.e. ceil) that preserve the monotonicity."
..maybe I am missing something so sorry if I just bother you for nothing (it is
just to make sure we think of all cases before hand). I saw examples of
applications where you have multiple fields of the same type. For example an
event can have 3 time fields of TIMESTAMP, 1 of DATE and 2 of TIME (this is
actually from a real application with some sort fo standard communication
schema). I was referring to such cases that it is unclear to me how the code
will identify the exact field to use as rowtime for example. This is what I
meant about how are we passing indicators to spot the row time field as well as
what would happen with the code in such a situation as it can identify multiple
time fields.
Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI,
which is intended only for the person or entity whose address is listed above.
Any use of the information contained herein in any way (including, but not
limited to, total or partial disclosure, reproduction, or dissemination) by
persons other than the intended recipient(s) is prohibited. If you receive this
e-mail in error, please notify the sender by phone or email immediately and
delete it!
-----Original Message-----
From: Timo Walther [mailto:twal...@apache.org]
Sent: Monday, March 20, 2017 12:00 PM
To: Radu Tudoran
Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time
Hi Radu,
you are right. It is hard to give line specific comments without having
a PR. I will open a work-in-progress PR.
Regarding the visit functions: Yes you are right. We will need to define
how time is handled for every new operator and override/implement visit
functions for it.
The RexTimeIndicatorMaterializer wraps concrete time accesses in a
materialization function. This changes the field type to an regular
timestamp. The CodeGenerator then knows that he needs to access the
getTimestamp() method of the ProcessFunction.
Sorry for the bad documentation. This code is still work-in-progress,
that's why I didn't add much comments so far.
The RexInputRefUpdater basically updates the references when a logical
row type is converted to physical row type. I haven't written test for
this code yet, so there might be bugs in it.
We have to think about how we integrate join etc. in follow up issues.
What this code ensures is, that operators know what is time and what is
just a "regular field". A join operator knows from which side the time
comes from and which type it has (proc or rowtime).
"It might be sometimes that this is not explicit to be guessed" That is
why I added the RelTimeConverter. After this conversion step it should
be as explicit as possible (by using the special types). And we can add
special handling of functions (i.e. ceil) that preserve the monotonicity.
"Also in the same class, the 2 function look the same" there is a
difference in the last argument ;-)
Yes, you are right this is a large change, but we should integrate it in
1.3 as it is also API breaking and a necessary concept for future PR.
The longer we wait, the harder it is to rebase it ontop of all the
windows in the pipeline. But of course we need to discuss this with the
community.
Thanks for looking into the code!
Timo
Am 20/03/17 um 11:28 schrieb Radu Tudoran:
Hi Timo,
As I did not see a pull request I did not know exactly what is the best way to
give you feedback over the code...so as a last resort I thought to write you an
email. If you can recommend better tools for the future please do so.
In your class
org.apache.flink.table.api.scala.stream.table.AggregationsITCase.scala
=> I guess there are several others overrides to be done for visit function for
other types of logical operators (join,window,...). Does this mean that from now
when we implement a new operator we need to add the corresponding implementation
here as well?
In the RexTimeIndicatorMaterializer class I am not sure I understand the logic
of materializing the time field. From my basic understanding this should mean
that if we had before materializing it 4 fields for example we need to add a
5th one for the time. It is a bit hard to follow at first glance how this is
implemented. If you could add some more comments it would be great
In the RexInputRefUpdater.scala I would expect that you add the number of times
to be materialized to the input rather than removing it
new RexInputRef(inputRef.getIndex - countTimeIndicators, inputRef.getType)
my question is shouldn't it be "+" (I a guess you tested and it works - case in
which please take my remark more in the direction of what whould be the logic of having
it with minus and not plus...maybe not really the kind of feedback you needed)
=>new RexInputRef(inputRef.getIndex + countTimeIndicators, inputRef.getType)
Also - I see that you iterate over the input reference to look over time. Is it
possible to have more than one? Can we have/should we have for example both
proctime and rowtime...or multiple times rowtime?. Or perhaps this is for the
case of Join/Unions where each side of the biRel comes with its own time. In
this case how do you want to support this? Will you preserve both or create one
single new one that you preserve (I would argue for this later option as
regardless if the events worked before on proctime or rowtime - after the
union/join a new event is created and the timestamp should be reflected as such
based on the moment of creation).
In the FlinkTypeFactory.scala in def createRowtimeIndicatorType()
Shouldn't you access somehow the field that is indicated to contain the rowtime
from the event. It might be sometimes that this is not explicit to be guessed.
Or do you make some implicit assumption that this is uniquely identifiable by
some indicators (name, type,...)
Also in the same class, the 2 function look the same. Is this a mistake?...i
would assume there must be some differentiation...
def createProctimeIndicatorType(): RelDataType = {
88 + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
89 + new TimeIndicatorRelDataType(getTypeSystem,
originalType.asInstanceOf[BasicSqlType], false)
90 + }
91 +
92 + def createRowtimeIndicatorType(): RelDataType = {
93 + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
94 + new TimeIndicatorRelDataType(getTypeSystem,
originalType.asInstanceOf[BasicSqlType], true)
95 + }
I saw in the buildRowDataType the thing I was expected with adding to the event
types fields corresponding to proctime and rowtime. I am still a bit confused
what was happening before :)
It looks there are many modifications that would be affected by this change. I
would propose this to be included only after/right before the feature freeze as
it would create a lot of rebase and reimplementation for the ongoing working
features
...not sure if this was helpful...
Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München
E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI,
which is intended only for the person or entity whose address is listed above.
Any use of the information contained herein in any way (including, but not
limited to, total or partial disclosure, reproduction, or dissemination) by
persons other than the intended recipient(s) is prohibited. If you receive this
e-mail in error, please notify the sender by phone or email immediately and
delete it!
-----Original Message-----
From: Timo Walther [mailto:twal...@apache.org]
Sent: Monday, March 20, 2017 10:48 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time
Hi Radu,
we differentiate rowtime and processing time fields by their field types. Both
indicators extend the timestamp type. In my protoype I added the functions
FlinkTypeFactory.isRowtime() and
FlinkTypeFactory.isProctime() for checking this. If a time indicator has been
materiatized (e.g. long.cast(STRING)), it becomes a regular timestamp (or in
this case a string after evaluation). So we cannot differentiate between
rowtime and proctime anymore. However, we can add some exceptions for certain
functions (e.g. for ceil() in combination with windows) that preserve the time
attributes.
Count windows have to be defined over a time attribute. If you take a look at
the tests of org.apache.flink.table.api.scala.stream.table.AggregationsITCase,
you can see that countWindows are still supported as before. As I said, in most
of the user-facing API does not change. It only tries to make time more
explicit.
Timo
Am 20/03/17 um 10:34 schrieb Radu Tudoran:
Hi Timo,
I have some questions regarding your implementation:
" The timestamp (not an indicator anymore) becomes part of the physical row.
E.g.
long.cast(STRING) would require a materialization "
=> If we have this how are we going to make a difference between rowtime and
processtime? For supporting some queries/operators you only need to use these time
indications as markers to have something like below. If you do not get access to
any sort of unique markers to indicate these than we will have hard time to
support many implementations. What would be the option to support this condition
in your implementation
if(rowtime)
...
else if(proctime)
...some other implemenetation
"- Windows are only valid if they work on time indicators."
=> Does this mean we can no longer work with count windows? There are a lot of
queries where windows would be defined based on cardinality of elements.
-----Original Message-----
From: Timo Walther [mailto:twal...@apache.org]
Sent: Monday, March 20, 2017 10:08 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time
Hi everyone,
for the last two weeks I worked on a solution for the time indicator issue. I
have implemented a prototype[1] which shows how we can express, track, and
access time in a consistent way for batch and stream tables.
Main changes of my current solution:
- Processing and rowtime time indicators can be named arbitrarily
- They can be defined as follows: stream.toTable(tEnv, 'long, 'int, 'string,
'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int, 'string)
- In a streaming environment: if the "long" field is already defined in the record, it
will not be read by the runtime. "long" always represents the timestamp of the row.
- In batch environment: "long" must be present in the record and will be read
by the runtime.
- The table definition looks equivalent in both batch and streaming (better
unification than current state)
- Internally row types are split up in a logical and a physical row type.
- The logical row type contains time indicators, the physical rowtime never contains time
indicators (the pure "long" will never be in a record)
- After validation and query decorrelation, a special time indicator converter
traverses the RelNodes and analyzes if the a time indicator is accessed or only
forwarded.
- An access to a time indicator means that we need to materialize the rowtime
using a ProcessFunction (not yet implemented). The timestamp (not an indicator
anymore) becomes part of the physical row. E.g.
long.cast(STRING) would require a materialization
- Forwarding of time indicators does not materialize the rowtime. It remains a
logical attribute. E.g. .select('long)
- Windows are only valid if they work on time indicators.
There are still a lot of open question that we can discuss and/or fix in future
PRs. For now it would be great if you could give some feedback about the
current implementation. With some exceptions my branch can be built
successfully.
Regards,
Timo
[1] https://github.com/twalthr/flink/tree/FLINK-5884
Am 02/03/17 um 07:22 schrieb jincheng sun:
Hi,
@Timo, thanks for your replay, and congratulations on your job.
@Fibian, No matter what way to achieve, as long as when the table is
generated or created, identity the field attributes, that is what we want.
I think at this point we are on the same page. We can go ahead.
And very glad to hear That: `the 'rowtime keyword would be removed`,
which is a very important step for keeping Stream and Batch consistent.
Best,
SunJincheng
2017-03-01 17:24 GMT+08:00 Fabian Hueske <fhue...@gmail.com>:
Hi,
@Xingcan
Yes that is right. It is not (easily) possible to change the
watermarks of a stream. All attributes which are used as event-time
timestamps must be aligned with these watermarks. This are only
attributes which are derived from the original rowtime attribute,
i.e., the one that was specified when the Table was created.
@SunJincheng
Regarding your points:
1. Watermarks can only be generated for (almost) sorted attributes.
Since a stream has only one sort order and cannot be sorted before it
is converted into Table, there will be hardly a case where n > 1 is
possible. The only possibility I see are two attributes which are in
almost the same order but with a certain distance (think of orderDate
and shipDate, but values would always be 1 day apart). However, this
requirement is very limiting and to be honest, I don't see how
assigning different watermarks for different attributes would work reliably in
practice.
The ORDER BY clause in an OVER window can only be used because the
stream is already sorted on that attribute (that's also why it is
restricted to rowtime and proctime in streaming)
2. Since a stream can only have one sort order, we so far assumed
that streams would already have watermarks and timestamps assigned. I
think this is a fair assumption, because a stream can only have one
order and hence only one timestamped & watermarked attribute (except
for the corner case I discussed above). As Timo said, .rowtime would
only add an attribute which refers to the already assigned timestamp of a row.
3. I completely agree that the difference between batch and streaming
should be overcome. This is actually the goal of Timo's work. So yes,
the 'rowtime keyword would be removed because any attribute can be
marked as event-time attribute (by calling 't.rowtime).
Btw. A table source could still make the watermark configurable by
offering a respective interface. However, I'm not yet convinced that
this needs to be part of the Table API.
What do you think?
Best, Fabian
2017-03-01 7:55 GMT+01:00 jincheng sun <sunjincheng...@gmail.com>:
Hi,Fabian,
Thanks for your attention to this discussion. Let me share some
ideas about this. :)
1. Yes, the solution I have proposed can indeed be extended to
support multi-watermarks. A single watermark is a special case of
multiple watermarks (n = 1). I agree that for the realization of the
simple, that
we
currently only support single watermark. Our idea is consistent.
BTW. I think even if we only use one attribute to generate
watermark we also need to sort, because in OVER window(Event-time)
we must know the exact data order, is that right?
2. I think our difference is how to register the watermark?
Now we see two ways:
A. t.rowtime;
If I understand correctly, in the current design when we use
the expression 'rowtime, The system defaults based on user data to
export timestamps;
B. registeredWatermarks ('t, waterMarkFunction1):
We are explicitly registered to generate watermarks and
extract timestamps in user-defined ways;
These two ways are characterized by:
Approach A: The system defaults to export the value of the t
field as
a
timestamp, which is simple for the system.
Approach B: the user can develop the logic of the export
timestamp,
for
the user has been very flexible. For example: the field `t` is a
complex field (value is:` xxx # 20170302111129 # yyy`), the user can
press a certain logic export timestamp (20170302111129).
So i tend to approach B. What do you think?
3. We are very concerned about the unity of Stream and Batch, such
as
the
current TableAPI:
Batch:
Table
.window (Tumble over 2.rows on 'long as' w) //' long is the
normal field
.groupBy ('w)
.select ('int.count)
Stream:
Table
.window (Tumble over 5.milli on 'rowtime as' w) //' rowtime
is the keyword
.groupBy ('w)
.select ('int.count)
As mentioned above, the two example are event-time aggregation
window, but the writing did not do the same way, batch we have a
specific column, stream need 'rowtime keyword. I think we need to
try to eliminate this difference. What do you think?
In the current google doc I see `table.window (tumble over
1.hour on
't
as' w) .groupBy ('a,' w) .select ('w.start,' b.count)`, Does this
mean
that
in FLINK-5884 will remove the tableAPI 'rowtime keyword?
So I am currently talking on the event-time in the SQL
indicators, in
the
table registered column attributes, does this mean that the batch
and stream SQL in the writing and use of the same?
Very appreciated for your feedback.
Best,
SunJincheng
2017-03-01 10:40 GMT+08:00 Xingcan Cui <xingc...@gmail.com>:
Hi all,
I have a question about the designate time for `rowtime`. The
current design do this during the DataStream to Table conversion.
Does this
mean
that `rowtime` is only valid for the source streams and can not be
designated after a subquery? (That's why I considered using alias
to dynamically designate it in a SQL before)
Best,
Xingcan
On Wed, Mar 1, 2017 at 5:35 AM, Fabian Hueske <fhue...@gmail.com>
wrote:
Hi Jincheng Sun,
registering watermark functions for different attributes to allow
each
of
them to be used in a window is an interesting idea.
However, watermarks only work well if the streaming data is
(almost)
in
timestamp order. Since it is not possible to sort a stream, all
attributes
that would qualify as event-time attributes need to be in almost
the
same
order. I think this limits the benefits of having multiple
watermark functions quite significantly. But maybe you have a good
use case
that
you
can share where multiple event-time attributes would work well.
So far our approach has been that a DataStream which is converted
into
a
Table has already timestamps and watermarks assigned. We also
assumed
that
a StreamTableSource would provide watermarks and timestamps and
indicate
the name of the attribute that carries the timestamp.
@Stefano: That's great news. I'd suggest to open a pull request
and
have
a
look at PR #3397 which handles the (partitioned) unbounded case.
Would
be
good to share some code between these approaches.
Thanks, Fabian
2017-02-28 18:17 GMT+01:00 Stefano Bortoli <
stefano.bort...@huawei.com
:
Hi all,
I have completed a first implementation that works for the SQL
query
SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2
PRECEDING) AS sumB FROM MyTable
I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it
just
on
simple queries such as the one above. Is there any specific case
I
should
be looking at?
Regards,
Stefano
-----Original Message-----
From: jincheng sun [mailto:sunjincheng...@gmail.com]
Sent: Tuesday, February 28, 2017 12:26 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Table API / SQL indicators for event and
processing
time
Hi everyone, thanks for sharing your thoughts. I really like
Timo’s proposal, and I have a few thoughts want to share.
We want to keep the query same for batch and streaming. IMO.
“process
time”
is something special to dataStream while it is not a well defined
term
for
batch query. So it is kind of free to create something new for
processTime.
I think it is a good idea to add a proctime as a reserved keyword
for
SQL.
Regarding to “event time”, it is well defined for batch query.
So
IMO,
we
should keep the way of defining a streaming window exactly same
as
batch
window. Therefore, the row for event time is nothing special, but
just
a
normal column. The major difference between batch and stream is
that
in
dataStream the event time column must be associated with a
watermark
function. I really like the way Timo proposed, that we can select
any
column as rowtime. But I think instead of just clarify a column
is
a
rowtime (actually I do not think we need this special rowtime
keyword),
it
is better to register/associate the waterMark function to this
column
when
creating the table. For dataStream, we will validate a rowtime
column
only
if it has been associated with the waterMark function. A
prototype
code
to
explain how it looks like is shown as below:
TableAPI:
toTable(tEnv, 'a, 'b, 'c)
.registeredWatermarks('a, waterMarkFunction1)
batchOrStreamTable
.window(Tumble over 5.milli on 'a as 'w)
.groupBy('w, 'b)
.select('b, 'a.count as cnt1, 'c.sum as cnt2)
SQL:
addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
.registeredWatermarks('a, waterMarkFunction1)
SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE
BETWEEN
2
PRECEDING) AS sumB FROM MyTable
What do you think ?
2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>:
Hi everyone,
I have create an issue [1] to track the progress of this topic.
I
have
written a little design document [2] how we could implement the
indicators and which parts have to be touched. I would suggest
to implement a prototype, also to see what is possible and can
be integrated both in Flink and Calcite. Feedback is welcome.
Regards,
Timo
[1] https://issues.apache.org/jira/browse/FLINK-5884
[2] https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tg
F1ueOAsFiQwahR72vbc/edit?usp=sharing
Am 21/02/17 um 15:06 schrieb Fabian Hueske:
Hi Xingcan,
thanks for your thoughts.
In principle you are right that the monotone attribute property
would
be sufficient, however there are more aspects to consider than
that.
Flink is a parallel stream processor engine which means that
data
is
processed in separate processes and shuffle across them.
Maintaining a strict order when merging parallel streams would
be
prohibitively expensive.
Flink's watermark mechanism helps operators to deal with
out-of-order
data (due to out-of-order input or shuffles).
I don't think we can separate the discussion about time
attributes
from watermarks if we want to use Flink as a processing engine
and
not reimplement large parts from scratch.
When transforming a time attribute, we have to either align it
with
existing watermarks or generate new watermarks.
If we want to allow all kinds of monotone transformations, we
have
to
adapt the watermarks which is not trivial.
Instead, I think we should initially only allow very few
monotone
transformations which are aligned with the existing watermarks.
We
might later relax this condition if we see that users request
this
feature.
You are right, that we need to track which attribute can be
used
as
a
time attribute (i.e., is increasing and guarded by watermarks).
For that we need to expose the time attribute when a Table is
created
(either when a DataStream is converted like:
stream.toTable(tEnv,
'a,
'b,
't.rowtime) or in a StreamTableSource) and track how it is used
in
queries.
I am not sure if the monotone property would be the right
choice here, since data is only quasi-monotone and a monotone
annotation
might trigger some invalid optimizations which change the
semantics
of
a query.
Right now, Calcite does not offer a quasi-monotone property (at
least
I haven't found it).
Best, Fabian
2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:
Hi all,
As I said in another thread, the main difference between
stream
and
table is that a stream is an ordered list while a table is an
unordered set.
Without considering the out-of-order problem in practice,
whether
event-time or processing-time can be just taken as a
monotonically
increasing field and that's why the given query[1] would work.
In
other words, we must guarantee the "SELECT MAX(t22.rowtime)
..."
subquery returns a single value that can be retrieved from the
cached dynamic table since it's dangerous to join two
un-windowed
streams.
Under this circumstance, I just consider adding a "monotonic
hint"(INC or
DEC) to the field of a (generalized) table (maybe using an
annotation on the registerDataXX method) that can be used to
indicate whether a field is monotonically increasing or
decreasing.
Then by taking rowtime as common (monotonically increasing)
field,
there are several benefits:
1) This can uniform the table and stream by importing total
ordering
relation to an unordered set.
2) These fields can be modified arbitrarily as long as they
keep
the
declared monotonic feature and the watermark problem does not
exist
any more.
3) The monotonic hint will be useful in the query optimization
process.
What do you think?
Best,
Xingcan
[1]
SELECT t1.amount, t2.rate FROM
table1 AS t1,
table2 AS t2
WHERE
t1.currency = t2.currency AND
t2.rowtime = (
SELECT MAX(t22.rowtime) FROM table2 AS
t22
AND t22.rowtime <= t1.rowtime)
On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <
fhue...@gmail.com>
wrote:
Hi everybody,
When Timo wrote to the Calcite mailing list, Julian Hyde
replied
and gave good advice and explained why a system attribute for
event-time would be
a
problem [1].
I thought about this and agree with Julian.
Here is a document to describe the problem, constraints in
Flink
and a proposal how to handle processing time and event time
in Table API and
SQL:
->
https://docs.google.com/document/d/1MDGViWA_
TCqpaVoWub7u_GY4PMFSbT8TuaNl-
EpbTHQ
Please have a look, comment and ask questions.
Thank you,
Fabian
[1]
https://lists.apache.org/thread.html/
6397caf0ca37f97f2cd27d96f7a12c
6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
Thanks everybody for the comments.
Actually, I think we do not have much choice when deciding
whether
to
use
attributes or functions.
Consider the following join query:
SELECT t1.amount, t2.rate FROM
table1 AS t1,
table2 AS t2
WHERE
t1.currency = t2.currency AND
t2.rowtime = (
SELECT MAX(t22.rowtime) FROM table2 AS
t22
AND t22.rowtime <= t1.rowtime)
The query joins two streaming tables. Table 1 is a streaming
table
with amounts in a certain currency. Table 2 is a (slowly
changing)
streaming table of currency exchange rates.
We want to join the amounts stream with the exchange rate of
the
corresponding currency that is valid (i.e., last received
value
->
MAX(rowtime)) at the rowtime of the amounts row.
In order to specify the query, we need to refer to the
rowtime
of
the different tables. Hence, we need a way to relate the
rowtime
expression
(or
marker) to a table.
This is not possible with a parameterless scalar function.
I'd like to comment on the concerns regarding the
performance:
In fact, the columns could be completely virtual and only
exist
during query parsing and validation.
During execution, we can directly access the rowtime
metadata
of
a
Flink
streaming record (which is present anyway) or look up the
current
processing time from the machine clock. So the processing
overhead
would
actually be the same as with a marker function.
Regarding the question on what should be allowed with a
system
attribute:
IMO, it could be used as any other attribute. We need it at
least
in
GROUP
BY, ORDER BY, and WHERE to define windows and joins. We
could
also
allow
to
access it in SELECT if we want users to give access to
rowtime
and
processing time. So @Haohui, your query could be supported.
However, what would not be allowed is to modify the value of
the
rows, i.e., by naming another column rowtime, i.e., "SELECT
sometimestamp AS rowtime" would not be allowed, because
Flink
does
not support to modify
the
event time of a row (for good reasons) and processing time
should
not
be
modifiable anyway.
@Timo:
I think the approach to only use the system columns during
parsing
and validation and converting them to expressions afterwards
makes
a lot of sense.
The question is how this approach could be nicely integrated
with
Calcite.
Best, Fabian
2017-02-15 16:50 GMT+01:00 Radu Tudoran <
radu.tudo...@huawei.com
:
Hi,
My initial thought would be that it makes more sense to
thave
procTime()
and rowTime() only as functions which in fact are to be used
as
markers.
Having the value (even from special system attributes does
not
make
sense
in some scenario such as the ones for creating windows,
e.g.,
If you have SELECT Count(*) OVER (ORDER BY procTime()...)
If
you
get the value of procTime you cannot do anything as you
need
the
marker to know how to construct the window logic.
However, your final idea of having " implement some
rule/logic
that translates the attributes to special RexNodes
internally
"
I
believe
is
good and gives a solution to both problems. One the one hand
for
those
scenarios where you need the value you can access the
value, while for others you can see the special type of the
RexNode
and
use it as a
marker.
Regarding keeping this data in a table...i am not sure as
you would say
we need to augment the data with two fields whether needed
or
not...this
is nto necessary very efficient
Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert IT R&D Division
HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research
Center Riesstrasse 25, 80992 München
E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
Düsseldorf, Germany, www.huawei.com Registered Office:
Düsseldorf, Register Court Düsseldorf,
HRB
56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz
der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
HRB
56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This
e-mail and its attachments contain confidential
information
from
HUAWEI, which is intended only for the person or entity
whose
address
is
listed above. Any use of the information contained herein in
any
way
(including, but not limited to, total or partial
disclosure,
reproduction,
or dissemination) by persons other than the intended
recipient(s)
is
prohibited. If you receive this e-mail in error, please
notify
the
sender
by phone or email immediately and delete it!
-----Original Message-----
From: Timo Walther [mailto:twal...@apache.org]
Sent: Wednesday, February 15, 2017 9:33 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Table API / SQL indicators for event
and
processing time
Hi all,
at first I also thought that built-in functions (rowtime()
and
proctime()) are the easiest solution. However, I think to
be
future-proof
we should make them system attributes; esp. to relate them
to a
corresponding table in case of multiple tables. Logically
they
are
attributes of each row, which is already done in Table API.
I will ask on the Calcite ML if there is a good way for
integrating
system attributes. Right now, I would propose the following
implementation:
- we introduce a custom row type (extending RelDataType)
- in a streaming environment every row has two attributes
by
default
(rowtime and proctime)
- we do not allow creating a row type with those attributes
(this
should
already prevent `SELECT field AS rowtime FROM ...`)
- we need to ensure that these attributes are not part of
expansion
like
`SELECT * FROM ...`
- implement some rule/logic that translates the attributes
to
special
RexNodes internally, such that the opimizer does not modify
these
attributes
What do you think?
Regards,
Timo
Am 15/02/17 um 03:36 schrieb Xingcan Cui:
Hi all,
thanks for this thread.
@Fabian If I didn't miss the point, the main difference
between
the
two approaches is whether or not taking these time
attributes
as
common table fields that are directly available to users.
Whatever,
these time attributes should be attached to records
(right?),
and
the
discussion lies in whether give them public qualifiers like
other
common fields or private qualifiers and related get/set
methods.
The former (system attributes) approach will be more
compatible
with
existing SQL read-only operations (e.g., select, join),
but
we
need
to
add restrictions on SQL modification operation (like what?).
I
think
there are no needs to forbid users modifying these
attributes
via
table APIs (like map function). Just inform them about
these
special
attribute names like system built in aggregator names in
iteration.
As for the built in function approach, I don't know if,
for
now,
there
are functions applied on a single row (maybe the value access
functions like COMPOSITE.get(STRING)?). It seems that most
of
the
built in functions work for a single field or on columns
and
thus
it
will be mountains of work if we want to add a new kind of
function
to
SQL. Maybe all existing operations should be modified to
support
it.
All in all, if there are existing supports for single row
function,
I
prefer the built in function approach. Otherwise the system
attributes
approach should be better. After all there are not so much
modification operations in SQL and maybe we can use alias
to
support
time attributes setting (just hypothesis, not sure if it's
feasible).
@Haohui I think the given query is valid if we add a
aggregate
function to (PROCTIME()
- ROWTIME()) / 1000 and it should be executed efficiently.
Best,
Xingcan
On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <
ricet...@gmail.com>
wrote:
Hi,
Thanks for starting the discussion. I can see there are
multiple
trade-offs in these two approaches. One question I have
is
that
to
which extent Flink wants to open its APIs to allow users
to
access
both processing and event time.
Before we talk about joins, my understanding for the two
approaches
that you mentioned are essentially (1) treating the value
of
event
/
processing time as first-class fields for each row, (2)
limiting
the
scope of time indicators to only specifying windows. Take the
following query as an
example:
SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM
table
GROUP
BY
FLOOR(PROCTIME() TO MINUTES)
There are several questions we can ask:
(1) Is it a valid query?
(2) How efficient the query will be?
For this query I can see arguments from both sides. I
think
at
the
end of the day it really comes down to what Flink wants
to
support.
After working on FLINK-5624 I'm more inclined to support
the
second
approach (i.e., built-in functions). The main reason why
is
that
the
APIs of Flink are designed to separate times from the real
payloads.
It probably makes sense for the Table / SQL APIs to have the
same
designs.
For joins I don't have a clear answer on top of my head.
Flink
requires two streams to be put in the same window before
doing
the
joins. This is essentially a subset of what SQL can
express. I
don't
know what would be the best approach here.
Regards,
Haohui
On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <
fhue...@gmail.com
wrote:
Hi,
It would as in the query I gave as an example before:
SELECT
a,
SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS
BETWEEN
2
PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
Here "proctime" would be a system attribute of the table
"myStream".
The table would also have another system attribute called
"rowtime"
which would be used to indicate event time semantics.
These attributes would always be present in tables which
are
derived
from streams.
Because we still require that streams have timestamps
and
watermarks
assigned (either by the StreamTableSource or the somewhere
downstream the DataStream program) when they are
converted
into a
table, there is no
need
to register anything.
Does that answer your questions?
Best, Fabian
2017-02-14 2:04 GMT+01:00 Radu Tudoran <
radu.tudo...@huawei.com
:
Hi Fabian,
Thanks for starting the discussion. Before I give my
thoughts
on
this
can
you please give some examples of how would you see
option
of
using
"system
attributes"?
Do you use this when you register the stream as a
table,
do
you
use
if when you call an SQL query, do you use it when you
translate
back a
table
to a stream / write it to a dynamic table?
Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert IT R&D
Division
HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research
Center Riesstrasse 25, 80992 München
E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330 <+49%201520%209084330>
Telephone: +49 891588344173 <+49%2089%201588344173>
HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205,
40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court
Düsseldorf,
HRB
56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht
Düsseldorf,
HRB
56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential
information
from
HUAWEI, which is intended only for the person or entity
whose
address
is
listed above. Any use of the information contained
herein
in
any
way
(including, but not limited to, total or partial disclosure,
reproduction,
or dissemination) by persons other than the intended
recipient(s)
is
prohibited. If you receive this e-mail in error, please
notify
the
sender
by phone or email immediately and delete it!
-----Original Message-----
From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Tuesday, February 14, 2017 1:01 AM
To: dev@flink.apache.org
Subject: [DISCUSS] Table API / SQL indicators for event
and
processing
time
Hi,
I'd like to start an discussion about how Table API /
SQL
queries
indicate
whether an operation is done in event or processing
time.
1) Why do we need to indicate the time mode?
We need to distinguish event time and processing time
mode
for
operations
in queries in order to have the semantics of a query
fully
defined.
This cannot be globally done in the TableEnvironment because
some
queries
explicitly request an expression such as the ORDER BY
clause
of
an
OVER
window with PRECEDING / FOLLOWING clauses.
So we need a way to specify something like the
following
query:
SELECT
a,
SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS
BETWEEN 2
PRECEDING
AND CURRENT ROW) AS sumB, FROM myStream
where "proctime" indicates processing time.
Equivalently
"rowtime"
would
indicate event time.
2) Current state
The current master branch implements time support only
for
grouping
windows in the Table API.
Internally, the Table API converts a 'rowtime symbol
(which
looks
like
a
regular attribute) into a special expression which
indicates
event-time.
For example:
table
.window(Tumble over 5.milli on 'rowtime as 'w)
.groupBy('a, 'w)
.select(...)
defines a tumbling event-time window.
Processing-time is indicated by omitting a time
attribute
(table.window(Tumble over 5.milli as 'w) ).
3) How can we do that in SQL?
In SQL we cannot add special expressions without
touching
the
parser
which
we don't want to do because we want to stick to the SQL
standard.
Therefore, I see only two options: adding system
attributes
or
(parameterless) built-in functions. I list some pros
and
cons
of
the
approaches below:
1. System Attributes:
+ most natural way to access a property of a record.
+ works with joins, because time attributes can be
related
to
tables
- We need to ensure the attributes are not writable and
always
present
in
streaming tables (i.e., they should be system defined
attributes).
- Need to adapt existing Table API expressions (will not
change
the
API
but some parts of the internal translation)
- Event time value must be set when the stream is
converted,
processing
time is evaluated on the fly
2. Built-in Functions
+ Users could try to modify time attributes which is
+ not
possible
with
functions
- do not work with joins, because we need to address
different
relations
- not a natural way to access a property of a record
I think the only viable choice are system attributes,
because
built-in
functions cannot be used for joins.
However, system attributes are the more complex
solution
because
they
need
a better integration with Calcite's SQL validator
(preventing
user
attributes which are named rowtime for instance).
Since there are currently a several contributions on
the
way
(such
as
SQL
OVER windows FLINK-5653 to FLINK-5658) that need time
indicators,
we
need a
solution soon to be able to make progress.
There are two PRs, #3252 and #3271, which implement the
built-in
marker
functions proctime() and rowtime() and which could serve
as a
temporary
solution (since we do not work on joins yet).
I would like to suggest to use these functions as a
starting
point
(once
the PRs are merged) and later change to the system
attribute
solution
which
needs a bit more time to be implemented.
I talked with Timo today about this issue and he said
he
would
like
to
investigate how we can implement this as system functions
properly
integrated with Calcite and the SQL Validator.
What do others think?
Best, Fabian