[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17438953#comment-17438953
 ] 

Seth Wiesman edited comment on FLINK-24767 at 11/4/21, 10:19 PM:
-----------------------------------------------------------------

-When executing a `keyBy` under batch execution, the DataStream API groups 
elements by key using a sort[1]. I have not investigated, but my suspicion is 
this is not a stable sort and you are seeing the result of that.- 
[~dwysakowicz] -I'm guessing this is expected?- 

 

The sort grouping does respect event time timestamps, so I was mistaken. Is it 
possible to have a reproducible example that doesn't rely on random numbers? 
I'd also be curious to see if the countWindow has anything to do with it. Under 
the hood, count windows are based on evictors and that's a weird code path. 

 

 
[1]https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/#state-backends--state


was (Author: sjwiesman):
When executing a `keyBy` under batch execution, the DataStream API groups 
elements by key using a sort[1]. I have not investigated, but my suspicion is 
this is not a stable sort and you are seeing the result of that. [~dwysakowicz] 
I'm guessing this is expected? 

 

 
[1]https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/#state-backends--state

> A keyBy following countWindow does not preserve order within the same 
> partition
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-24767
>                 URL: https://issues.apache.org/jira/browse/FLINK-24767
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.13.3
>            Reporter: Lee Y S
>            Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array<String>) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>       .keyBy { it.first }
>       .countWindow(3L, 1)
>       .reduce { a, b -> b }
>       .keyBy { it.first }
>       .filter { it.first == 5 }
>       .print()
>     env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to