Resending to dev@f.a.o

Hi Xingcan,

This is expected behavior. In general, is not possible to guarantee results for 
processing time.

Your query is translated as follows:

CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n) -fwd-> 
MapFunc(n) -fwd-> Sink(n)

The order of records is changed because of the connection between source and 
first map function. Here, records are distributed round robin to increase the 
parallelism from 1 to n. The parallel instances of map might forward the 
records in different order to the ProcessFunction that computes the 
aggregation. 

Hope this helps,
Fabian


Von: Stefano Bortoli
Gesendet: Dienstag, 11. April 2017 14:10
An: dev@flink.apache.org
Betreff: RE: Question about the process order in stream aggregate

Hi Xingcan,

Are you using parallelism 1 for the test?  procTime semantics deals with the 
objects as they loaded in the operators. It could be the co-occuring 
partitioned events (in the same MS time frame) are processed in parallel and 
then the output is produced in different order.

I suggest you to have a look at the integration test to verify that the 
configuration of your experiment is correct.

Best,
Stefano

-----Original Message-----
From: Xingcan Cui [mailto:xingc...@gmail.com] 
Sent: Tuesday, April 11, 2017 5:31 AM
To: dev@flink.apache.org
Subject: Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply 
registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 1),
      Order(2L, "diaper", 2),
      Order(3L, "diaper", 3),
      Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime() rows 
between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for 
them can always be preserved.

Thanks,
Xingcan

Reply via email to