Hi 时某人,

I think you've found an inconsistency in Flink's windowing API (but it's
the same in the Java API). Handling operator state in the context of
windows is a little bit delicate because you could have multiple windows in
flight, though. I've pulled Aljoscha in this thread who is more familiar
with the windowing API and can give you probably a better explanation.

I think either we allow it or we check that a window function does not
implement the Checkpointed interface and if it does, then notify the user
about it. Furthermore, I think we should document these subtle behaviour
differences better.

Cheers,
Till

On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <shijinkui...@163.com> wrote:

> Is there some State backend and checkpoint design architecture document?
>
>
> ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup.
> https://github.com/pusuo/streaming-resource/blob/master/flink-meetup-hz-
> 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%
> E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf
>
>
> Thanks
>
> At 2016-11-26 10:30:52, "liuxinchun" <liuxinc...@huawei.com> wrote:
>
>
> Hi all:
>
>
>
> I am paying attention to Flink, and encounter a problem about user defined
> window with checkpoint. My code like this:
>
>
>
> class WindowStatistics extends WindowFunction[Event, Int, Tuple,
> TimeWindow] with Checkpointed[Option[Int]]: Unit = {
>
>
>
>          private var count = 0
>
>
>
> override def apply(key: Tuple, window: TimeWindow, input: Iterator[Event],
> out: Collector[Int]): Unit = {
>
>          count = XXXX
>
>          XXXXXXXX
>
>          out.collect(count)
>
> }
>
> override def snapshotState(checkpointId: Long, checkpointTimestamp: Long):
> Option[Int] = {
>
>           Some(count)
>
> }
>
>
>
> Override def restoreState(state: Option[Int]): Unit = {
>
>           state match {
>
>                    case Some(c) => count = c
>
>                    case None => count = 0
>
>          }
>
> }
>
> }
>
>  and
>
> env.enableCheckpointing(5000)
>
> env.setStateBackend(new RocksDBStateBackend(“file:///data/”))
>
>  when making checkpoint, my window only make checkpoint of data in
> window(panes), but user defined state(count) is not contained in
> checkpoint. When debugging, I found in
>
>  line 123 of AbstractUdfStreamOperator.java
>
> if (userFunction instanceof Checkpointed) {
>
>      XXXXXX
>
> }
>
> is false(other operators, like map, filter is true). And userFunction is
> actually a ScalaWindowFunctionWrapper object.
>
>  So, my question is : Is it a bug? If not, what is the design philosophy
> of window’s checkpoint? In many scenes, users may want to checkpoint their
> own defined states, but the design does not support seemingly. Or my method
> of window’s checkpoint application is wrong?
>
>  Thank you!

Reply via email to