[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17438953#comment-17438953 ]
Seth Wiesman commented on FLINK-24767: -------------------------------------- When executing a `keyBy` under batch execution, the DataStream API groups elements by key using a sort[1]. I have not investigated, but my suspicion is this is not a stable sort and you are seeing the result of that. [~dwysakowicz] I'm guessing this is expected? [1]https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/#state-backends--state > A keyBy following countWindow does not preserve order within the same > partition > ------------------------------------------------------------------------------- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.13.3 > Reporter: Lee Y S > Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array<String>) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.3.4#803005)