[ 
https://issues.apache.org/jira/browse/FLINK-15719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173617#comment-17173617
 ] 

Ying Z commented on FLINK-15719:
--------------------------------

Hi, [~tzulitai] , I want to help to modify the doc [1] to make it less 
error-prone, is it ok? Here is my test code of keyed state in scala lang:
 # stateful process function to generate state
 # inputs: 1 2 3 4 5 6

 
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
  var state: ValueState[Int] = _
  var updateTimes: ListState[Long] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
//    val stateDescriptor = new ValueStateDescriptor("state", Types.INT)
    state = getRuntimeContext().getState(stateDescriptor)

    val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
//    val updateDescriptor = new ListStateDescriptor("times", Types.LONG)
    updateTimes = getRuntimeContext().getListState(updateDescriptor)
  }

  @throws[Exception]
  override def processElement(value: Int, ctx: KeyedProcessFunction[ Int, Int, 
Void ]#Context, out: Collector[Void]): Unit = {
    state.update(value + 1)
    updateTimes.add(System.currentTimeMillis)
  }
}

object KeyedStateSample extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val fsStateBackend = new FsStateBackend("file:///tmp/chk_dir")
  env.setStateBackend(fsStateBackend)
  env.enableCheckpointing(60000)
  
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

  env.socketTextStream("127.0.0.1", 8010)
    .map(_.toInt)
    .keyBy(i => i)
    .process(new StatefulFunctionWithTime)
    .uid("my-uid")

  env.execute()
}
{code}
 
 # read the state generated by code above, which outputs:
 # KeyedState(3,4,List(1596878053283))
KeyedState(5,6,List(1596878055023))
KeyedState(2,3,List(1596878052359))
KeyedState(4,5,List(1596878054098))
KeyedState(6,7,List(1596878056151))
KeyedState(1,2,List(1596878051332))

 
{code:java}
// code placeholder
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.state.api.Savepoint
import org.apache.flink.state.api.functions.KeyedStateReaderFunction
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._

/**
 * Description:
 */
object TestReadState extends App {
  val bEnv      = ExecutionEnvironment.getExecutionEnvironment
  val savepoint = Savepoint.load(bEnv, 
"file:///tmp/chk_dir/f988137ef1df4597bebc596ef7c76626/chk-2", new 
MemoryStateBackend)
  val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction)
  keyedState.print()

  case class KeyedState(key: Int, value: Int, times: List[Long])
  class ReaderFunction extends KeyedStateReaderFunction[java.lang.Integer, 
KeyedState] {
    var state: ValueState[Int] = _
    var updateTimes: ListState[Long] = _

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
      state = getRuntimeContext().getState(stateDescriptor)

      val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
      updateTimes = getRuntimeContext().getListState(updateDescriptor)
    }

    override def readKey(key: java.lang.Integer,
                         ctx: KeyedStateReaderFunction.Context,
                         out: Collector[KeyedState]): Unit = {
      val data = KeyedState(
        key,
        state.value(),
        updateTimes.get().asScala.toList)
      out.collect(data)
    }
  }
}
{code}
 

 

1. 
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state

> Exceptions when using scala types directly with the State Process API
> ---------------------------------------------------------------------
>
>                 Key: FLINK-15719
>                 URL: https://issues.apache.org/jira/browse/FLINK-15719
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.9.1
>            Reporter: Ying Z
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>
> I followed these steps to generate and read states:
>  # implements the example[1] `CountWindowAverage` in Scala(exactly same), and 
> run jobA => that makes good.
>  # execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
>  # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), 
> and run jobB => failed, exceptions shows that "Caused by: 
> org.apache.flink.util.StateMigrationException: The new key serializer must be 
> compatible."
> ReaderFunction code as below:
> {code:java}
> // code placeholder
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
>     var countState: ValueState[(Long, Long)] = _
>     override def open(parameters: Configuration): Unit = {
>       val stateDescriptor = new ValueStateDescriptor("average", 
> createTypeInformation[(Long, Long)])
>       countState = getRuntimeContext().getState(stateDescriptor)
>     }    override def readKey(key: Long, ctx: 
> KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {
>       out.collect(countState.value())
>     }
>   }
> {code}
> 1: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state]
>  
> 2: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to