Hi community,
When I use state in Scala, something makes confused, I followed these steps to
generate and read states:
a. implements the example[1] `CountWindowAverage` in Scala(exactly same), and
run jobA => that makes good.
b. execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
c. implements the example[2] `StatefulFunctionWithTime` in Scala(code below),
and run jobB => failed, exceptions shows that "Caused by:
org.apache.flink.util.StateMigrationException: The new key serializer must be
compatible."
ReaderFunction code as below:
```
class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
var countState: ValueState[(Long, Long)] = _
override def open(parameters: Configuration): Unit = {
val stateDescriptor = new ValueStateDescriptor("average",
createTypeInformation[(Long, Long)])
countState = getRuntimeContext().getState(stateDescriptor)
}
override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out:
Collector[(Long, Long)]): Unit = {
out.collect(countState.value())
}
}
```
d. then I try to use java.lang.Long instead of Long in key-type, and run jobB
=> exception just disappeared and that makes good.
This makes me confused. Did I miss some features in State-Processing-API, such
as `magic-implicits`?
And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
again,this time I tried to use Tuple(java.lang.Long) or something else, but
does not work.
Hope.
1:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
2:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state