Hi all,

   Thanks very much. I wants to debug checkpoint with code. Below is my code. 
Anyway I am sorry I doesn’t understand UT class. 
def demo(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(10000)
  val checkpointConfig = env.getCheckpointConfig
  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  checkpointConfig.setMinPauseBetweenCheckpoints(5000)
  checkpointConfig.setCheckpointTimeout(5000)
  checkpointConfig.setMaxConcurrentCheckpoints(1)
  
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  val fsStateBackend: StateBackend = new FsStateBackend(STATE_BACKEND)
  env.setStateBackend(fsStateBackend)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 30000))

  //TODO recovery my checkpoint here or run this  job from my checkpoint
  // how to run this job with checkpoint metadata ? use CheckpointCoordinator ??
  val dataStream: DataStream[String] = 
env.addSource(streamSource).name("mysource")
  dataStream.addSink(new MySQLSink).uid("tesCheckpoint").name("mysink")
  env.execute()
}
MySQLSink:
class MySQLSink extends RichSinkFunction[String] with CheckpointedFunction {

  private val bufferSize = 50
  private var count: AtomicInteger = _
  private var cacheData: ListBuffer[String] = ListBuffer[String]()
  private var checkpointedState: ListState[(String, ListBuffer[String])] = _

  override def open(parameters: Configuration): Unit = {
    count = new AtomicInteger(0)
  }

  override def invoke(jsonData: String, context: SinkFunction.Context[_]): Unit 
= {
    val flag = count.getAndIncrement()
    val end: Long = System.currentTimeMillis()
    val result = jsonData.substring(0,jsonData.length-1) + ",\"fend\":"+end+"}";
    if (flag >= bufferSize) {
      cacheData += result
      saveDataList()
      cacheData.clear()
      count.set(1)
    } else {
      cacheData += result
    }
  }

  def saveDataList(): Unit = {
    
  }

  override def close(): Unit = {
    super.close()
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    val buffer = ListBuffer[(String, ListBuffer[String])](("nlcpTestData", 
cacheData))
    checkpointedState.addAll(buffer.toList.asJava)
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val listStateDesc = new ListStateDescriptor[(String, 
ListBuffer[String])]("nlcpTestData", TypeInformation.of(new TypeHint[(String, 
ListBuffer[String])]() {}))
    val stateStore: OperatorStateStore = context.getOperatorStateStore
    checkpointedState = stateStore.getListState(listStateDesc)
    if (context.isRestored) {
      val data = checkpointedState.get().iterator()
      while (data.hasNext) {
        cacheData ++= data.next()._2
      }
    }
  }

}


> 在 2019年11月7日,12:03,Congxian Qiu <qcx978132...@gmail.com> 写道:
> 
> Hi,
> If you just want to debug, maybe you can do this in UT class in module
> flink-runtime :) so that you do not need to handle the dependency problem,
> and access problem.
> 
> Best,
> Congxian
> 
> 
> Jark Wu <imj...@gmail.com> 于2019年11月6日周三 下午3:39写道:
> 
>> Btw, user questions should be asked in user@f.a.o or user-zh@f.a.o. The
>> dev
>> ML is mainly used to discuss development.
>> 
>> Best,
>> Jark
>> 
>> On Wed, 6 Nov 2019 at 15:36, Jark Wu <imj...@gmail.com> wrote:
>> 
>>> Hi,
>>> 
>>> Savepoint.load(env, path) is in state processor API library, you should
>>> add the following dependency in your project.
>>> 
>>> <dependency>
>>>  <groupId>org.apache.flink</groupId>
>>>  <artifactId>flink-state-processor-api_2.11</artifactId>
>>>  <version>1.9.1</version>
>>> </dependency>
>>> 
>>> 
>>> You can see the docuementation for more detailed instructions [1].
>>> 
>>> Best,
>>> Jark
>>> 
>>> [1]:
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>> 
>>> On Wed, 6 Nov 2019 at 09:21, qq <471237...@qq.com> wrote:
>>> 
>>>> Hi all,
>>>>   I want to load checkpoint or savepoint metadata on dev . in this case
>>>> , I want to debug saved checkpoint metadata. And I knew flink provided a
>>>> api which is Savepoint.load(env, path), but I can’t find it and can’t
>> use
>>>> it. Anyone who know about this ? Could you help me ? Thanks very much;
>>>> 
>>>> 
>> 
> 

Reply via email to