[ 
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Y S updated FLINK-24767:
----------------------------
    Description: 
I wrote a simple test of the countWindow method (in Kotlin) as below

{code:java}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import kotlin.random.Randomobject

CountWindowTest {
  @JvmStatic
  fun main(args: Array<String>) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    val rand = Random(0)
    val data = (0..1000).map { Pair(rand.nextInt(10), it) }

    env.fromCollection(data).assignTimestampsAndWatermarks(
      WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
        .withTimestampAssigner { e, _ -> e.second.toLong() })
      .keyBy { it.first }
      .countWindow(3L, 1)
      .reduce { a, b -> b }
      .keyBy { it.first }
      .filter { it.first == 5 }
      .print()  
    env.execute()
  }
}
{code}

The beginning of the output is as below

12> (5, 184)
 12> (5, 18)
 12> (5, 29)
 12> (5, 37)
 12> (5, 38)
 12> (5, 112)
 12> (5, 131)

The first line (5, 184) is not in order from the rest.

The problem disappears if I remove the keyBy after the reduce or use stream 
mode instead of batch mode.
  

  was:
I wrote a simple test of the countWindow method (in Kotlin) as below


{code:java}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import kotlin.random.Randomobject

CountWindowTest {
  @JvmStatic
  fun main(args: Array<String>) {
  val env = StreamExecutionEnvironment.getExecutionEnvironment()
  env.setRuntimeMode(RuntimeExecutionMode.BATCH)
  val rand = Random(0)
  val data = (0..1000).map { Pair(rand.nextInt(10), it) }

  env.fromCollection(data).assignTimestampsAndWatermarks(
    WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
      .withTimestampAssigner { e, _ -> e.second.toLong() })
    .keyBy { it.first }
    .countWindow(3L, 1)
    .reduce { a, b -> b }
    .keyBy { it.first }
    .filter { it.first == 5 }
    .print()  env.execute()
  }
}
{code}


The beginning of the output is as below

12> (5, 184)
 12> (5, 18)
 12> (5, 29)
 12> (5, 37)
 12> (5, 38)
 12> (5, 112)
 12> (5, 131)

The first line (5, 184) is not in order from the rest.

The problem disappears if I remove the keyBy after the reduce or use stream 
mode instead of batch mode.
  


> A keyBy following countWindow does not preserve order within the same 
> partition
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-24767
>                 URL: https://issues.apache.org/jira/browse/FLINK-24767
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.13.3
>            Reporter: Lee Y S
>            Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
>   @JvmStatic
>   fun main(args: Array<String>) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>     val rand = Random(0)
>     val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>     env.fromCollection(data).assignTimestampsAndWatermarks(
>       WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
>         .withTimestampAssigner { e, _ -> e.second.toLong() })
>       .keyBy { it.first }
>       .countWindow(3L, 1)
>       .reduce { a, b -> b }
>       .keyBy { it.first }
>       .filter { it.first == 5 }
>       .print()  
>     env.execute()
>   }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
>  12> (5, 18)
>  12> (5, 29)
>  12> (5, 37)
>  12> (5, 38)
>  12> (5, 112)
>  12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream 
> mode instead of batch mode.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to