[ 
https://issues.apache.org/jira/browse/FLINK-4428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin closed FLINK-4428.
----------------------------------
    Resolution: Resolved

The state TTL has been implemented for processing time in FLINK-3089.
I am closing the issue. Please, reopen it if you think we need more discussion 
here.

> Method map/flatMapWithState may need a eviction policy
> ------------------------------------------------------
>
>                 Key: FLINK-4428
>                 URL: https://issues.apache.org/jira/browse/FLINK-4428
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>    Affects Versions: 1.1.2
>            Reporter: Renkai Ge
>            Priority: Major
>
> I want to count the number of unique visitors of a website every day.
>  If the number changes, I want get the newest number in 1 second, and
>  it should keep silence if the number doesn't change.I implemented this 
>  by time window of 1 day,trigger of 1 second and flatMapWithState to 
>  filter duplicated results. 
> {code}
>      //    case class Visit(uuid: String, time: Long, platform: Int)
>  
>      //    case class WindowUv(platform: Int, uv: Long, windowStart: Long, 
> WindowEnd: Long)
>  
>      //  val consumer: FlinkKafkaConsumer08[Visit]
>      val stream =
>      env.addSource(consumer)
>        .keyBy(_.platform)
>        .timeWindow(Time.days(1))
>        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>        .applyWith((0, Set.empty[Int], 0l, 0l))(
>          foldFunction = {
>            case ((_, set, _, 0), visit) =>
>              (visit.platform, set + visit.uuid.hashCode, 0, 0)
>          },
>          windowFunction = {
>            case (key, window, results) =>
>              results.map {
>                case (platform, set, _, _) =>
>                  (platform, set, window.getStart, window.getEnd)
>              }
>          }
>        )
>        .mapWith {
>          case (key, set, windowStart, windowEnd) =>
>            WindowUv(key, set.size, windowStart, windowEnd)
>        }
>        .keyBy(uv => (uv.platform, uv.windowStart))
>        .flatMapWithState[WindowUv, Int] {
>        case ((key, num, begin, end), curr) =>
>          curr match {
>            case Some(numCurr) if numCurr == num =>
>              (Seq.empty, Some(num))
>            case _ =>
>              (Seq(WindowUv(key, num, begin, end)), Some(num))
>          }
>      }
>      stream.print()
>      env.execute("Boom")
> {code}
> There is a problem that I used flatMapWithState,the state of one day will
> be never updated and never used after the day passed, but it will stay
> in the memory forever, there is no way to evict it. So I think the status
> in map may need some eviction policy related with time or global conditions
> rather than only with the last message of the key(It's hard to tell whether 
> a message is the last when the last is coming).  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to