[ 
https://issues.apache.org/jira/browse/FLINK-31970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17723386#comment-17723386
 ] 

Piotr Nowojski commented on FLINK-31970:
----------------------------------------

Yes, {{CheckpointedFunction}} can be used with a keyed state. Just keep in mind 
that with keyed state, any Flink's state field is a kind of proxy/wrapper, that 
for different keys is referencing a different value. For example if you have 
integer elements and {{keyBy(x -> x % 2)}}:

{code:java}
public YourFunction ... { 
  ValueState x = ...;
  (...)
}
{code}
then between calls
{code:java}
yourFunction.processElement(1);
yourFunction.processElement(2);
{code}
{{x}} will be referencing different objects. While between calls:
{code:java}
yourFunction.processElement(1);
yourFunction.processElement(3);
{code}
{{x}} will be referencing the same object.

Now the issue is that for {{CheckpointedFunction#snapshotState}} or 
{{CheckpointedFunction#initializeState}} calls, we are not processing any 
element, so we don't have any key, so who knows where {{x}} will be pointing 
to. Maybe to the last processed key? I don't know.

I was testing your code from IDE after creating an [integration 
test|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource]

{quote}
Obviously this is a rather simplified example of a more complex problem I try 
to solve. I want to construct a very huge state, which is updated as result of 
tumbling window data. However, I want to access the RocksDB backend as rarely 
as possible, only when checkpoint is being constructed. Between two checkpoints 
the state would change drastically, so I do not want to preserve intermediate 
changes, If I use a ValueState it can potentially spill to disk on each .update 
call in-between checkpoints. Do you see better approach to achieve this?
{quote}
I see. However you have to store the intermediate changes somewhere. If they 
fit into memory, and your primary goal is to not touch disks while processing 
elements, then I would suggest to use {{HashMapStateBackend}}. In that case 
your state would be flushed to disks only on checkpoints. If your intermediate 
changes might not fit into memory, then you have no choice, you have to use 
{{RocksDB}} all the time.


> "Key group 0 is not in KeyGroupRange" when using CheckpointedFunction
> ---------------------------------------------------------------------
>
>                 Key: FLINK-31970
>                 URL: https://issues.apache.org/jira/browse/FLINK-31970
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.17.0
>            Reporter: Yordan Pavlov
>            Priority: Major
>         Attachments: fill-topic.sh, main.scala
>
>
> I am experiencing a problem where the following exception would be thrown on 
> Flink stop (stop with savepoint):
>  
> {code:java}
> org.apache.flink.util.SerializedThrowable: 
> java.lang.IllegalArgumentException: Key group 0 is not in 
> KeyGroupRange{startKeyGroup=86, endKeyGroup=127}.{code}
>  
> I do not have a non deterministic keyBy() operator in fact, I use 
> {code:java}
> .keyBy(_ => 1){code}
> I believe the problem is related to using RocksDB state along with a 
> {code:java}
> CheckpointedFunction{code}
> In my test program I have commented out a reduction of the parallelism which 
> would make the problem go away. I am attaching a standalone program which 
> presents the problem and also a script which generates the input data. For 
> clarity I would paste here the essence of the job:
>  
>  
> {code:scala}
> env.fromSource(kafkaSource, watermarkStrategy, "KafkaSource")
> .setParallelism(3)
> .keyBy(_ => 1)
> .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MILLISECONDS)))
> .apply(new TestWindow())
> /* .setParallelism(1) this would prevent the problem */
> .uid("window tester")
> .name("window tester")
> .print()
> class TestWindow() extends WindowFunction[(Long, Int), Long, Int, TimeWindow] 
> with CheckpointedFunction {
>   var state: ValueState[Long] = _
>   var count = 0
>   override def snapshotState(functionSnapshotContext: 
> FunctionSnapshotContext): Unit = {
>     state.update(count)
>   }
>   override def initializeState(context: FunctionInitializationContext): Unit 
> = {
>     val storeDescriptor = new 
> ValueStateDescriptor[Long]("state-xrp-dex-pricer", 
> createTypeInformation[Long])
>     state = context.getKeyedStateStore.getState(storeDescriptor)
>   }
>   override def apply(key: Int, window: TimeWindow, input: Iterable[(Long, 
> Int)], out: Collector[Long]): Unit = {
>     count += input.size
>     out.collect(count)
>   }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to