[ 
https://issues.apache.org/jira/browse/FLINK-31970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719336#comment-17719336
 ] 

Piotr Nowojski commented on FLINK-31970:
----------------------------------------

[~YordanPavlov] I was not able to reproduce your error, however you are 
incorrectly working with the state in your {{TestWindow}} function. Please take 
a look at the 
[documentation|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#using-keyed-state].
 Especially the following paragraph
{quote}
It is important to keep in mind that these state objects are only used for 
interfacing with state. The state is not necessarily stored inside but might 
reside on disk or somewhere else. The second thing to keep in mind is that the 
value you get from the state depends on the key of the input element. So the 
value you get in one invocation of your user function can differ from the value 
in another invocation if the keys involved are different.
{quote}
First problem is that your {{TestWindow#snapshoState}} method is trying to 
access {{state}} field, in a keyed operator/function, inside a method that 
doesn't have key context. That's the error I was getting when I tried to run 
your code. 
Secondly, in your {{TestWindow#apply}} method is invoked within some key 
context, but you are storing the actual {{count}} as a regular java field. So 
if you ever had more then one key (that's not the case in your example, as you 
are using {{.keyBy(_ => 1)}} key selector), all of the keys processed by a 
single parallel instance of the window operator, would be collected on the same 
{{count}} field, which doesn't make any sense.

The corrected window function should look like this:

{code:scala}
class CorrectTestWindow() extends WindowFunction[(Long, Int), Long, Int, 
TimeWindow] with CheckpointedFunction {
  var state: ValueState[Long] = _

  override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): 
Unit = {
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val storeDescriptor = new 
ValueStateDescriptor[Long]("state-xrp-dex-pricer", createTypeInformation[Long])
    state = context.getKeyedStateStore.getState(storeDescriptor)
  }

  override def apply(key: Int, window: TimeWindow, input: lang.Iterable[(Long, 
Int)], out: Collector[Long]): Unit = {
    val count = state.value() + input.size
    state.update(count)
    out.collect(count)
  }
}
{code}
However even that is kind of strange. You are creating a tumbling window every 
1ms, only to aggregate the {{count}} across all past tumbling windows for the 
given key? If you want to aggregate count only across the given tumbling 
window, your apply could look like as simple as:
{code:scala}
  override def apply(key: Int, window: TimeWindow, input: lang.Iterable[(Long, 
Int)], out: Collector[Long]): Unit = {
    out.collect(input.size)
  }
{code}
And you don't need any additional state for that. On the other hand, if you 
want to aggregate results globally, you can use
{code:scala}
        .keyBy(_ => 1)
        .window(GlobalWindows.create())
        .tigger(/* maybe a custom trigger here */)
        .apply(new TestWindow())
{code}

> "Key group 0 is not in KeyGroupRange" when using CheckpointedFunction
> ---------------------------------------------------------------------
>
>                 Key: FLINK-31970
>                 URL: https://issues.apache.org/jira/browse/FLINK-31970
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.17.0
>            Reporter: Yordan Pavlov
>            Priority: Major
>         Attachments: fill-topic.sh, main.scala
>
>
> I am experiencing a problem where the following exception would be thrown on 
> Flink stop (stop with savepoint):
>  
> {code:java}
> org.apache.flink.util.SerializedThrowable: 
> java.lang.IllegalArgumentException: Key group 0 is not in 
> KeyGroupRange{startKeyGroup=86, endKeyGroup=127}.{code}
>  
> I do not have a non deterministic keyBy() operator in fact, I use 
> {code:java}
> .keyBy(_ => 1){code}
> I believe the problem is related to using RocksDB state along with a 
> {code:java}
> CheckpointedFunction{code}
> In my test program I have commented out a reduction of the parallelism which 
> would make the problem go away. I am attaching a standalone program which 
> presents the problem and also a script which generates the input data. For 
> clarity I would paste here the essence of the job:
>  
>  
> {code:scala}
> env.fromSource(kafkaSource, watermarkStrategy, "KafkaSource")
> .setParallelism(3)
> .keyBy(_ => 1)
> .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MILLISECONDS)))
> .apply(new TestWindow())
> /* .setParallelism(1) this would prevent the problem */
> .uid("window tester")
> .name("window tester")
> .print()
> class TestWindow() extends WindowFunction[(Long, Int), Long, Int, TimeWindow] 
> with CheckpointedFunction {
>   var state: ValueState[Long] = _
>   var count = 0
>   override def snapshotState(functionSnapshotContext: 
> FunctionSnapshotContext): Unit = {
>     state.update(count)
>   }
>   override def initializeState(context: FunctionInitializationContext): Unit 
> = {
>     val storeDescriptor = new 
> ValueStateDescriptor[Long]("state-xrp-dex-pricer", 
> createTypeInformation[Long])
>     state = context.getKeyedStateStore.getState(storeDescriptor)
>   }
>   override def apply(key: Int, window: TimeWindow, input: Iterable[(Long, 
> Int)], out: Collector[Long]): Unit = {
>     count += input.size
>     out.collect(count)
>   }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to