[
https://issues.apache.org/jira/browse/FLINK-4428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrey Zagrebin closed FLINK-4428.
----------------------------------
Resolution: Resolved
The state TTL has been implemented for processing time in FLINK-3089.
I am closing the issue. Please, reopen it if you think we need more discussion
here.
> Method map/flatMapWithState may need a eviction policy
> ------------------------------------------------------
>
> Key: FLINK-4428
> URL: https://issues.apache.org/jira/browse/FLINK-4428
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream
> Affects Versions: 1.1.2
> Reporter: Renkai Ge
> Priority: Major
>
> I want to count the number of unique visitors of a website every day.
> If the number changes, I want get the newest number in 1 second, and
> it should keep silence if the number doesn't change.I implemented this
> by time window of 1 day,trigger of 1 second and flatMapWithState to
> filter duplicated results.
> {code}
> // case class Visit(uuid: String, time: Long, platform: Int)
>
> // case class WindowUv(platform: Int, uv: Long, windowStart: Long,
> WindowEnd: Long)
>
> // val consumer: FlinkKafkaConsumer08[Visit]
> val stream =
> env.addSource(consumer)
> .keyBy(_.platform)
> .timeWindow(Time.days(1))
> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
> .applyWith((0, Set.empty[Int], 0l, 0l))(
> foldFunction = {
> case ((_, set, _, 0), visit) =>
> (visit.platform, set + visit.uuid.hashCode, 0, 0)
> },
> windowFunction = {
> case (key, window, results) =>
> results.map {
> case (platform, set, _, _) =>
> (platform, set, window.getStart, window.getEnd)
> }
> }
> )
> .mapWith {
> case (key, set, windowStart, windowEnd) =>
> WindowUv(key, set.size, windowStart, windowEnd)
> }
> .keyBy(uv => (uv.platform, uv.windowStart))
> .flatMapWithState[WindowUv, Int] {
> case ((key, num, begin, end), curr) =>
> curr match {
> case Some(numCurr) if numCurr == num =>
> (Seq.empty, Some(num))
> case _ =>
> (Seq(WindowUv(key, num, begin, end)), Some(num))
> }
> }
> stream.print()
> env.execute("Boom")
> {code}
> There is a problem that I used flatMapWithState,the state of one day will
> be never updated and never used after the day passed, but it will stay
> in the memory forever, there is no way to evict it. So I think the status
> in map may need some eviction policy related with time or global conditions
> rather than only with the last message of the key(It's hard to tell whether
> a message is the last when the last is coming).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)