Hi Xingcan,
If you need to guarantee the order also in the case of procTime a trick that
you can do is to set the working time of the env to processing time and to
assign the proctime to the incoming stream. You can do this via
.assignTimestampsAndWatermarks(new ...)
And override
override def extractTimestamp(
element: type...,
previousElementTimestamp: Long): Long = {
System.currentTimeMillis()
}
Alternatively you can play around with the stream source and control the time
when the events come
Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München
E-mail: [email protected]
Mobile: +49 15209084330
Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI,
which is intended only for the person or entity whose address is listed above.
Any use of the information contained herein in any way (including, but not
limited to, total or partial disclosure, reproduction, or dissemination) by
persons other than the intended recipient(s) is prohibited. If you receive this
e-mail in error, please notify the sender by phone or email immediately and
delete it!
-----Original Message-----
From: [email protected] [mailto:[email protected]]
Sent: Tuesday, April 11, 2017 2:24 PM
To: Stefano Bortoli; [email protected]
Subject: AW: Question about the process order in stream aggregate
Resending to [email protected]
Hi Xingcan,
This is expected behavior. In general, is not possible to guarantee results for
processing time.
Your query is translated as follows:
CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n) -fwd->
MapFunc(n) -fwd-> Sink(n)
The order of records is changed because of the connection between source and
first map function. Here, records are distributed round robin to increase the
parallelism from 1 to n. The parallel instances of map might forward the
records in different order to the ProcessFunction that computes the
aggregation.
Hope this helps,
Fabian
Von: Stefano Bortoli
Gesendet: Dienstag, 11. April 2017 14:10
An: [email protected]
Betreff: RE: Question about the process order in stream aggregate
Hi Xingcan,
Are you using parallelism 1 for the test? procTime semantics deals with the
objects as they loaded in the operators. It could be the co-occuring
partitioned events (in the same MS time frame) are processed in parallel and
then the output is produced in different order.
I suggest you to have a look at the integration test to verify that the
configuration of your experiment is correct.
Best,
Stefano
-----Original Message-----
From: Xingcan Cui [mailto:[email protected]]
Sent: Tuesday, April 11, 2017 5:31 AM
To: [email protected]
Subject: Question about the process order in stream aggregate
Hi all,
I run some tests for stream aggregation on rows. The data stream is simply
registered as
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 1),
Order(2L, "diaper", 2),
Order(3L, "diaper", 3),
Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),
and the SQL is defined as
select product, sum(amount) over (partition by product order by procTime() rows
between unbounded preceding and current row from orderA).
My expected output should be
2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).
However, sometimes I get the following output
2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).
It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?
BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for
them can always be preserved.
Thanks,
Xingcan