Hi,
this is indeed a bug (though I would see it more as a feature since I think
using the Checkpointed interface there can indeed be problematic, as Till
pointed out). The problem is that the Scala Wrapper functions have to
implement all kinds of interfaces so that they can forward to the wrapped
function. Or we would have to have a wrapper function for each combination
of interfaces that a user function can implement.

In the long run, our use of interfaces for user functions does not seem to
scale well in the Scala API.

Cheers,
Aljoscha

On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <trohrm...@apache.org> wrote:

> Hi 时某人,
>
> I think you've found an inconsistency in Flink's windowing API (but it's
> the same in the Java API). Handling operator state in the context of
> windows is a little bit delicate because you could have multiple windows in
> flight, though. I've pulled Aljoscha in this thread who is more familiar
> with the windowing API and can give you probably a better explanation.
>
> I think either we allow it or we check that a window function does not
> implement the Checkpointed interface and if it does, then notify the user
> about it. Furthermore, I think we should document these subtle behaviour
> differences better.
>
> Cheers,
> Till
>
> On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <shijinkui...@163.com> wrote:
>
> > Is there some State backend and checkpoint design architecture document?
> >
> >
> > ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup.
> > https://github.com/pusuo/streaming-resource/blob/master/flink-meetup-hz-
> > 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%
> > E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf
> >
> >
> > Thanks
> >
> > At 2016-11-26 10:30:52, "liuxinchun" <liuxinc...@huawei.com> wrote:
> >
> >
> > Hi all:
> >
> >
> >
> > I am paying attention to Flink, and encounter a problem about user
> defined
> > window with checkpoint. My code like this:
> >
> >
> >
> > class WindowStatistics extends WindowFunction[Event, Int, Tuple,
> > TimeWindow] with Checkpointed[Option[Int]]: Unit = {
> >
> >
> >
> >          private var count = 0
> >
> >
> >
> > override def apply(key: Tuple, window: TimeWindow, input:
> Iterator[Event],
> > out: Collector[Int]): Unit = {
> >
> >          count = XXXX
> >
> >          XXXXXXXX
> >
> >          out.collect(count)
> >
> > }
> >
> > override def snapshotState(checkpointId: Long, checkpointTimestamp:
> Long):
> > Option[Int] = {
> >
> >           Some(count)
> >
> > }
> >
> >
> >
> > Override def restoreState(state: Option[Int]): Unit = {
> >
> >           state match {
> >
> >                    case Some(c) => count = c
> >
> >                    case None => count = 0
> >
> >          }
> >
> > }
> >
> > }
> >
> >  and
> >
> > env.enableCheckpointing(5000)
> >
> > env.setStateBackend(new RocksDBStateBackend(“file:///data/”))
> >
> >  when making checkpoint, my window only make checkpoint of data in
> > window(panes), but user defined state(count) is not contained in
> > checkpoint. When debugging, I found in
> >
> >  line 123 of AbstractUdfStreamOperator.java
> >
> > if (userFunction instanceof Checkpointed) {
> >
> >      XXXXXX
> >
> > }
> >
> > is false(other operators, like map, filter is true). And userFunction is
> > actually a ScalaWindowFunctionWrapper object.
> >
> >  So, my question is : Is it a bug? If not, what is the design philosophy
> > of window’s checkpoint? In many scenes, users may want to checkpoint
> their
> > own defined states, but the design does not support seemingly. Or my
> method
> > of window’s checkpoint application is wrong?
> >
> >  Thank you!
>

Reply via email to