Yordan Pavlov created FLINK-31970:
-------------------------------------
Summary: "Key group 0 is not in KeyGroupRange" when using
CheckpointedFunction
Key: FLINK-31970
URL: https://issues.apache.org/jira/browse/FLINK-31970
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.17.0
Reporter: Yordan Pavlov
Attachments: fill-topic.sh, main.scala
I am experiencing a problem where the following exception would be thrown on
Flink stop (stop with savepoint):
{code:java}
org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException:
Key group 0 is not in KeyGroupRange{startKeyGroup=86, endKeyGroup=127}.{code}
I do not have a non deterministic keyBy() operator in fact, I use
{code:java}
.keyBy(_ => 1){code}
I believe the problem is related to using RocksDB state along with a
{code:java}
CheckpointedFunction{code}
In my test program I have commented out a reduction of the parallelism which
would make the problem go away. I am attaching a standalone program which
presents the problem and also a script which generates the input data. For
clarity I would paste here the essence of the job:
{code:scala}
env.fromSource(kafkaSource, watermarkStrategy, "KafkaSource")
.setParallelism(3)
.keyBy(_ => 1)
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MILLISECONDS)))
.apply(new TestWindow())
/* .setParallelism(1) this would prevent the problem */
.uid("window tester")
.name("window tester")
.print()
class TestWindow() extends WindowFunction[(Long, Int), Long, Int, TimeWindow]
with CheckpointedFunction {
var state: ValueState[Long] = _
var count = 0
override def snapshotState(functionSnapshotContext: FunctionSnapshotContext):
Unit = {
state.update(count)
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val storeDescriptor = new
ValueStateDescriptor[Long]("state-xrp-dex-pricer", createTypeInformation[Long])
state = context.getKeyedStateStore.getState(storeDescriptor)
}
override def apply(key: Int, window: TimeWindow, input: Iterable[(Long,
Int)], out: Collector[Long]): Unit = {
count += input.size
out.collect(count)
}
}{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)