[
https://issues.apache.org/jira/browse/FLINK-31970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723194#comment-17723194
]
Yordan Pavlov commented on FLINK-31970:
---------------------------------------
First and foremost thank you for your input [~pnowojski]
??First problem is that your {{TestWindow#snapshoState}} method is trying to
access {{state}} field, in a keyed operator/function, inside a method that
doesn't have key context.??
Looking at the comments in the [source
code|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java],
I was left with the impression that CheckpointedFunction can be used with a
keyed state:
{quote}While more lightweight interfaces exist as
* shortcuts for various types of state, this interface offer the greatest
flexibility in managing
* both <i>keyed state</i> and <i>operator state</i>.{quote}
??First problem is that your TestWindow#snapshoState method is trying to access
state field, in a keyed operator/function, inside a method that doesn't have
key context. That's the error I was getting when I tried to run your code.??
This error seems helpful, unfortunately I don't seem to get it. I have a simple
build.sbt which builds a jar file, which I then run on a locally running
cluster. Let me know if you run it differently.
??However even that is kind of strange. You are creating a tumbling window
every 1ms, only to aggregate the count across all past tumbling windows for the
given key???
Obviously this is a rather simplified example of a more complex problem I try
to solve. I want to construct a very huge state, which is updated as result of
tumbling window data. However, I want to access the RocksDB backend as rarely
as possible, only when checkpoint is being constructed. Between two checkpoints
the state would change drastically, so I do not want to preserve intermediate
changes, If I use a {{ValueState}} it can potentially spill to disk on each
{{.update}} call in-between checkpoints. Do you see better approach to achieve
this?
Again thank you for your time.
> "Key group 0 is not in KeyGroupRange" when using CheckpointedFunction
> ---------------------------------------------------------------------
>
> Key: FLINK-31970
> URL: https://issues.apache.org/jira/browse/FLINK-31970
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.17.0
> Reporter: Yordan Pavlov
> Priority: Major
> Attachments: fill-topic.sh, main.scala
>
>
> I am experiencing a problem where the following exception would be thrown on
> Flink stop (stop with savepoint):
>
> {code:java}
> org.apache.flink.util.SerializedThrowable:
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=86, endKeyGroup=127}.{code}
>
> I do not have a non deterministic keyBy() operator in fact, I use
> {code:java}
> .keyBy(_ => 1){code}
> I believe the problem is related to using RocksDB state along with a
> {code:java}
> CheckpointedFunction{code}
> In my test program I have commented out a reduction of the parallelism which
> would make the problem go away. I am attaching a standalone program which
> presents the problem and also a script which generates the input data. For
> clarity I would paste here the essence of the job:
>
>
> {code:scala}
> env.fromSource(kafkaSource, watermarkStrategy, "KafkaSource")
> .setParallelism(3)
> .keyBy(_ => 1)
> .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MILLISECONDS)))
> .apply(new TestWindow())
> /* .setParallelism(1) this would prevent the problem */
> .uid("window tester")
> .name("window tester")
> .print()
> class TestWindow() extends WindowFunction[(Long, Int), Long, Int, TimeWindow]
> with CheckpointedFunction {
> var state: ValueState[Long] = _
> var count = 0
> override def snapshotState(functionSnapshotContext:
> FunctionSnapshotContext): Unit = {
> state.update(count)
> }
> override def initializeState(context: FunctionInitializationContext): Unit
> = {
> val storeDescriptor = new
> ValueStateDescriptor[Long]("state-xrp-dex-pricer",
> createTypeInformation[Long])
> state = context.getKeyedStateStore.getState(storeDescriptor)
> }
> override def apply(key: Int, window: TimeWindow, input: Iterable[(Long,
> Int)], out: Collector[Long]): Unit = {
> count += input.size
> out.collect(count)
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)