Hi all, Thanks very much. I wants to debug checkpoint with code. Below is my code. Anyway I am sorry I doesn’t understand UT class. def demo(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.enableCheckpointing(10000) val checkpointConfig = env.getCheckpointConfig checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) checkpointConfig.setMinPauseBetweenCheckpoints(5000) checkpointConfig.setCheckpointTimeout(5000) checkpointConfig.setMaxConcurrentCheckpoints(1) checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val fsStateBackend: StateBackend = new FsStateBackend(STATE_BACKEND) env.setStateBackend(fsStateBackend) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 30000))
//TODO recovery my checkpoint here or run this job from my checkpoint // how to run this job with checkpoint metadata ? use CheckpointCoordinator ?? val dataStream: DataStream[String] = env.addSource(streamSource).name("mysource") dataStream.addSink(new MySQLSink).uid("tesCheckpoint").name("mysink") env.execute() } MySQLSink: class MySQLSink extends RichSinkFunction[String] with CheckpointedFunction { private val bufferSize = 50 private var count: AtomicInteger = _ private var cacheData: ListBuffer[String] = ListBuffer[String]() private var checkpointedState: ListState[(String, ListBuffer[String])] = _ override def open(parameters: Configuration): Unit = { count = new AtomicInteger(0) } override def invoke(jsonData: String, context: SinkFunction.Context[_]): Unit = { val flag = count.getAndIncrement() val end: Long = System.currentTimeMillis() val result = jsonData.substring(0,jsonData.length-1) + ",\"fend\":"+end+"}"; if (flag >= bufferSize) { cacheData += result saveDataList() cacheData.clear() count.set(1) } else { cacheData += result } } def saveDataList(): Unit = { } override def close(): Unit = { super.close() } override def snapshotState(context: FunctionSnapshotContext): Unit = { checkpointedState.clear() val buffer = ListBuffer[(String, ListBuffer[String])](("nlcpTestData", cacheData)) checkpointedState.addAll(buffer.toList.asJava) } override def initializeState(context: FunctionInitializationContext): Unit = { val listStateDesc = new ListStateDescriptor[(String, ListBuffer[String])]("nlcpTestData", TypeInformation.of(new TypeHint[(String, ListBuffer[String])]() {})) val stateStore: OperatorStateStore = context.getOperatorStateStore checkpointedState = stateStore.getListState(listStateDesc) if (context.isRestored) { val data = checkpointedState.get().iterator() while (data.hasNext) { cacheData ++= data.next()._2 } } } } > 在 2019年11月7日,12:03,Congxian Qiu <qcx978132...@gmail.com> 写道: > > Hi, > If you just want to debug, maybe you can do this in UT class in module > flink-runtime :) so that you do not need to handle the dependency problem, > and access problem. > > Best, > Congxian > > > Jark Wu <imj...@gmail.com> 于2019年11月6日周三 下午3:39写道: > >> Btw, user questions should be asked in user@f.a.o or user-zh@f.a.o. The >> dev >> ML is mainly used to discuss development. >> >> Best, >> Jark >> >> On Wed, 6 Nov 2019 at 15:36, Jark Wu <imj...@gmail.com> wrote: >> >>> Hi, >>> >>> Savepoint.load(env, path) is in state processor API library, you should >>> add the following dependency in your project. >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-state-processor-api_2.11</artifactId> >>> <version>1.9.1</version> >>> </dependency> >>> >>> >>> You can see the docuementation for more detailed instructions [1]. >>> >>> Best, >>> Jark >>> >>> [1]: >>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html >>> >>> On Wed, 6 Nov 2019 at 09:21, qq <471237...@qq.com> wrote: >>> >>>> Hi all, >>>> I want to load checkpoint or savepoint metadata on dev . in this case >>>> , I want to debug saved checkpoint metadata. And I knew flink provided a >>>> api which is Savepoint.load(env, path), but I can’t find it and can’t >> use >>>> it. Anyone who know about this ? Could you help me ? Thanks very much; >>>> >>>> >> >