Ori Popowski created FLINK-25007:
------------------------------------
Summary: Session window with dynamic gap doesn't work
Key: FLINK-25007
URL: https://issues.apache.org/jira/browse/FLINK-25007
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.12.0
Environment: Local environment
Reporter: Ori Popowski
I am creating a simple application with events firing every 15 seconds. I
created a {{
SessionWindowTimeGapExtractor}} which returns 90 minutes, but after the 4th
event, it should return 1 millisecond. I expected that after the 4th event, a
session window will trigger, but it's not what happens. In reality the session
window never triggers, even though after the 4th event, the session gap is
effectively 1 millisecond and the interval between events is 15 seconds.
{code:java}
object Main {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val now = Instant.now()
senv
.addSource(new Source(now))
.assignAscendingTimestamps(_.time.toEpochMilli)
.keyBy(_ => 1)
.window(DynamicEventTimeSessionWindows.withDynamicGap(new
SessionWindowTimeGapExtractor[Element] {
override def extract(element: Element): Long = {
if (element.sessionEnd) 1
else 90.minutes.toMillis
}
}))
.process(new ProcessWindowFunction[Element, Vector[Element], Int,
TimeWindow] {
override def process(k: Int, context: Context, elements:
Iterable[Element], out: Collector[Vector[Element]]): Unit = {
out.collect(elements.toVector)
}
})
.print()
senv.execute()
}
}
case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)
class Source(now: Instant) extends RichSourceFunction[Element] {
@volatile private var isRunning = true
private var totalInterval = 0L
private var i = 0
override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
while (isRunning) {
val element = Element(i, now.plusMillis(totalInterval))
if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
else ctx.collect(element)
i += 1
totalInterval += 15.seconds.toMillis
Thread.sleep(15.seconds.toMillis)
}
}
override def cancel(): Unit = {
isRunning = false
}
}{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)