Hi all:
I am paying attention to Flink, and encounter a problem about user defined
window with checkpoint. My code like this:
class WindowStatistics extends WindowFunction[Event, Int, Tuple, TimeWindow]
with Checkpointed[Option[Int]]: Unit = {
private var count = 0
override def apply(key: Tuple, window: TimeWindow, input: Iterator[Event], out:
Collector[Int]): Unit = {
count = XXXX
XXXXXXXX
out.collect(count)
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long):
Option[Int] = {
Some(count)
}
Override def restoreState(state: Option[Int]): Unit = {
state match {
case Some(c) => count = c
case None => count = 0
}
}
}
and
env.enableCheckpointing(5000)
env.setStateBackend(new RocksDBStateBackend(“file:///data/”))
when making checkpoint, my window only make checkpoint of data in
window(panes), but user defined state(count) is not contained in checkpoint.
When debugging, I found in
line 123 of AbstractUdfStreamOperator.java
if (userFunction instanceof Checkpointed) {
XXXXXX
}
is false(other operators, like map, filter is true). And userFunction is
actually a ScalaWindowFunctionWrapper object.
So, my question is : Is it a bug? If not, what is the design philosophy of
window’s checkpoint? In many scenes, users may want to checkpoint their own
defined states, but the design does not support seemingly. Or my method of
window’s checkpoint application is wrong?
Thank you!
刘新春 00354552
大数据技术开发部
*****************************************************************************
[cid:[email protected]]文档包<http://platformdoc.huawei.com/hedex/hwdc/doc/docInfo.jsp?productId=2472&type=doc>
[cid:[email protected]]培训中心<http://3ms.huawei.com/hi/index.php?app=group&mod=Core&act=showSectionData&gid=2031037&id=1250433>
[cid:[email protected]]案例库<http://3ms.huawei.com/hi/group/2031037/threads.html#category=1179741>