Hi community,

When I use state in Scala, something makes confused, I followed these steps to 
generate and read states:

a. implements the example[1] `CountWindowAverage` in Scala(exactly same), and 
run jobA => that makes good.

b. execute `flink cancel -s ${JobID}` => savepoints was generated as expected.

c. implements the example[2] `StatefulFunctionWithTime` in Scala(code below), 
and run jobB => failed, exceptions shows that "Caused by: 
org.apache.flink.util.StateMigrationException: The new key serializer must be 
compatible."


ReaderFunction code as below:

```

  class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {

    var countState: ValueState[(Long, Long)] = _

    override def open(parameters: Configuration): Unit = {

      val stateDescriptor = new ValueStateDescriptor("average", 
createTypeInformation[(Long, Long)])

      countState = getRuntimeContext().getState(stateDescriptor)

    }

    override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: 
Collector[(Long, Long)]): Unit = {

      out.collect(countState.value())

    }

  }

```

d. then I try to use java.lang.Long instead of Long in key-type, and run jobB 
=> exception just disappeared and that makes good.

This makes me confused. Did I miss some features in State-Processing-API, such 
as `magic-implicits`?

And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes 
again,this time I tried to use Tuple(java.lang.Long) or something else, but 
does not work.

Hope.

1: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
 

2: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
 

Reply via email to