[
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lee Y S updated FLINK-24767:
----------------------------
Description:
I wrote a simple test of the countWindow method (in Kotlin) as below
{code:java}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import kotlin.random.Randomobject
CountWindowTest {
@JvmStatic
fun main(args: Array<String>) {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
val rand = Random(0)
val data = (0..1000).map { Pair(rand.nextInt(10), it) }
env.fromCollection(data).assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
.withTimestampAssigner { e, _ -> e.second.toLong() })
.keyBy { it.first }
.countWindow(3L, 1)
.reduce { a, b -> b }
.keyBy { it.first }
.filter { it.first == 5 }
.print() env.execute()
}
}
{code}
The beginning of the output is as below
12> (5, 184)
12> (5, 18)
12> (5, 29)
12> (5, 37)
12> (5, 38)
12> (5, 112)
12> (5, 131)
The first line (5, 184) is not in order from the rest.
The problem disappears if I remove the keyBy after the reduce or use stream
mode instead of batch mode.
was:
I wrote a simple test of the countWindow method (in Kotlin) as below
package aero.airlab.flinkjobs.headingreminder
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import kotlin.random.Random
object CountWindowTest {
@JvmStatic
fun main(args: Array<String>) {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
val rand = Random(0)
val data = (0..1000).map \{ Pair(rand.nextInt(10), it) }
env.fromCollection(data).assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
.withTimestampAssigner \{ e, _ -> e.second.toLong() })
.keyBy \{ it.first }
.countWindow(3L, 1)
.reduce \{ a, b -> b }
.keyBy \{ it.first }
.filter \{ it.first == 5 }
.print()
env.execute()
}
}
The beginning of the output is as below
12> (5, 184)
12> (5, 18)
12> (5, 29)
12> (5, 37)
12> (5, 38)
12> (5, 112)
12> (5, 131)
The first line (5, 184) is not in order from the rest.
The problem disappears if I remove the keyBy after the reduce or use stream
mode instead of batch mode.
> A keyBy following countWindow does not preserve order within the same
> partition
> -------------------------------------------------------------------------------
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.3
> Reporter: Lee Y S
> Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
> @JvmStatic
> fun main(args: Array<String>) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment()
> env.setRuntimeMode(RuntimeExecutionMode.BATCH)
> val rand = Random(0)
> val data = (0..1000).map { Pair(rand.nextInt(10), it) }
> env.fromCollection(data).assignTimestampsAndWatermarks(
> WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
> .withTimestampAssigner { e, _ -> e.second.toLong() })
> .keyBy { it.first }
> .countWindow(3L, 1)
> .reduce { a, b -> b }
> .keyBy { it.first }
> .filter { it.first == 5 }
> .print() env.execute()
> }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
> 12> (5, 18)
> 12> (5, 29)
> 12> (5, 37)
> 12> (5, 38)
> 12> (5, 112)
> 12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream
> mode instead of batch mode.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)