[
https://issues.apache.org/jira/browse/FLINK-25007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ori Popowski updated FLINK-25007:
---------------------------------
Description:
I am creating a simple application with events firing every 15 seconds. I
created a {{SessionWindowTimeGapExtractor}} which returns 90 minutes, but after
the 4th event, it should return 1 millisecond. I expected that after the 4th
event, a session window will trigger, but it's not what happens. In reality the
session window never triggers, even though after the 4th event, the session gap
is effectively 1 millisecond and the interval between events is 15 seconds.
{code:java}
object Main {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val now = Instant.now()
senv
.addSource(new Source(now))
.assignAscendingTimestamps(_.time.toEpochMilli)
.keyBy(_ => 1)
.window(DynamicEventTimeSessionWindows.withDynamicGap(new
SessionWindowTimeGapExtractor[Element] {
override def extract(element: Element): Long = {
if (element.sessionEnd) 1
else 90.minutes.toMillis
}
}))
.process(new ProcessWindowFunction[Element, Vector[Element], Int,
TimeWindow] {
override def process(k: Int, context: Context, elements:
Iterable[Element], out: Collector[Vector[Element]]): Unit = {
out.collect(elements.toVector)
}
})
.print()
senv.execute()
}
}
case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)
class Source(now: Instant) extends RichSourceFunction[Element] {
@volatile private var isRunning = true
private var totalInterval = 0L
private var i = 0
override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
while (isRunning) {
val element = Element(i, now.plusMillis(totalInterval))
if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
else ctx.collect(element)
i += 1
totalInterval += 15.seconds.toMillis
Thread.sleep(15.seconds.toMillis)
}
}
override def cancel(): Unit = {
isRunning = false
}
}{code}
was:
I am creating a simple application with events firing every 15 seconds. I
created a {{
SessionWindowTimeGapExtractor}} which returns 90 minutes, but after the 4th
event, it should return 1 millisecond. I expected that after the 4th event, a
session window will trigger, but it's not what happens. In reality the session
window never triggers, even though after the 4th event, the session gap is
effectively 1 millisecond and the interval between events is 15 seconds.
{code:java}
object Main {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val now = Instant.now()
senv
.addSource(new Source(now))
.assignAscendingTimestamps(_.time.toEpochMilli)
.keyBy(_ => 1)
.window(DynamicEventTimeSessionWindows.withDynamicGap(new
SessionWindowTimeGapExtractor[Element] {
override def extract(element: Element): Long = {
if (element.sessionEnd) 1
else 90.minutes.toMillis
}
}))
.process(new ProcessWindowFunction[Element, Vector[Element], Int,
TimeWindow] {
override def process(k: Int, context: Context, elements:
Iterable[Element], out: Collector[Vector[Element]]): Unit = {
out.collect(elements.toVector)
}
})
.print()
senv.execute()
}
}
case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)
class Source(now: Instant) extends RichSourceFunction[Element] {
@volatile private var isRunning = true
private var totalInterval = 0L
private var i = 0
override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
while (isRunning) {
val element = Element(i, now.plusMillis(totalInterval))
if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
else ctx.collect(element)
i += 1
totalInterval += 15.seconds.toMillis
Thread.sleep(15.seconds.toMillis)
}
}
override def cancel(): Unit = {
isRunning = false
}
}{code}
> Session window with dynamic gap doesn't work
> --------------------------------------------
>
> Key: FLINK-25007
> URL: https://issues.apache.org/jira/browse/FLINK-25007
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.12.0
> Environment: Local environment
> Reporter: Ori Popowski
> Priority: Major
>
> I am creating a simple application with events firing every 15 seconds. I
> created a {{SessionWindowTimeGapExtractor}} which returns 90 minutes, but
> after the 4th event, it should return 1 millisecond. I expected that after
> the 4th event, a session window will trigger, but it's not what happens. In
> reality the session window never triggers, even though after the 4th event,
> the session gap is effectively 1 millisecond and the interval between events
> is 15 seconds.
>
> {code:java}
> object Main {
> def main(args: Array[String]): Unit = {
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val now = Instant.now()
> senv
> .addSource(new Source(now))
> .assignAscendingTimestamps(_.time.toEpochMilli)
> .keyBy(_ => 1)
> .window(DynamicEventTimeSessionWindows.withDynamicGap(new
> SessionWindowTimeGapExtractor[Element] {
> override def extract(element: Element): Long = {
> if (element.sessionEnd) 1
> else 90.minutes.toMillis
> }
> }))
> .process(new ProcessWindowFunction[Element, Vector[Element], Int,
> TimeWindow] {
> override def process(k: Int, context: Context, elements:
> Iterable[Element], out: Collector[Vector[Element]]): Unit = {
> out.collect(elements.toVector)
> }
> })
> .print()
> senv.execute()
> }
> }
> case class Element(id: Int, time: Instant, sessionEnd: Boolean = false)
> class Source(now: Instant) extends RichSourceFunction[Element] {
> @volatile private var isRunning = true
> private var totalInterval = 0L
> private var i = 0
> override def run(ctx: SourceFunction.SourceContext[Element]): Unit = {
> while (isRunning) {
> val element = Element(i, now.plusMillis(totalInterval))
> if (i >= 4) ctx.collect(element.copy(sessionEnd = true))
> else ctx.collect(element)
> i += 1
> totalInterval += 15.seconds.toMillis
> Thread.sleep(15.seconds.toMillis)
> }
> }
> override def cancel(): Unit = {
> isRunning = false
> }
> }{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)