Lee Y S created FLINK-24767:
-------------------------------

             Summary: A keyBy following countWindow does not preserve order 
within the same partition
                 Key: FLINK-24767
                 URL: https://issues.apache.org/jira/browse/FLINK-24767
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.13.3
            Reporter: Lee Y S


I wrote a simple test of the countWindow method (in Kotlin) as below



package aero.airlab.flinkjobs.headingreminder

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import kotlin.random.Random

object CountWindowTest {
 @JvmStatic
 fun main(args: Array<String>) {
  val env = StreamExecutionEnvironment.getExecutionEnvironment()
  env.setRuntimeMode(RuntimeExecutionMode.BATCH)

  val rand = Random(0)
  val data = (0..1000).map \{ Pair(rand.nextInt(10), it) }
  env.fromCollection(data).assignTimestampsAndWatermarks(
    WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
      .withTimestampAssigner \{ e, _ -> e.second.toLong() })
  .keyBy \{ it.first }
  .countWindow(3L, 1)
  .reduce \{ a, b -> b }
  .keyBy \{ it.first }
  .filter \{ it.first == 5 }
  .print()

  env.execute()
 }
}


The beginning of the output is as below

12> (5, 184)
12> (5, 18)
12> (5, 29)
12> (5, 37)
12> (5, 38)
12> (5, 112)
12> (5, 131)

The first line (5, 184) is not in order from the rest.

The problem disappears if I remove the keyBy after the reduce or use stream 
mode instead of batch mode.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to