yunfengzhou-hub commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1798718504
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java:
##########
@@ -128,6 +133,18 @@ public boolean supportsAsyncKeyedStateBackend() {
public <K> AsyncKeyedStateBackend<K> createAsyncKeyedStateBackend(
KeyedStateBackendParameters<K> parameters) {
return new AsyncKeyedStateBackend<K>() {
+ @Nonnull
+ @Override
+ public <
+ T extends
+ HeapPriorityQueueElement &
PriorityComparable<? super T>
+ & Keyed<?>>
+ KeyGroupedInternalPriorityQueue<T> create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T>
byteOrderedElementSerializer) {
+ return null;
Review Comment:
Might be better to `throw new UnsupportedOperationException();`
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java:
##########
@@ -244,6 +244,11 @@ void snapshotState(
&& ((AbstractKeyedStateBackend<?>)
keyedStateBackend)
.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());
+ requiresLegacyRawKeyedStateSnapshots |=
+ keyedStateBackend instanceof AsyncKeyedStateBackend
+ && ((AsyncKeyedStateBackend<?>)
keyedStateBackend)
+
.requiresLegacySynchronousTimerSnapshots(
Review Comment:
How about remove
`AsyncKeyedStateBackend#requiresLegacySynchronousTimerSnapshots` and do the
following
```java
// TODO remove this once heap-based timers are working with ForSt
incremental snapshots!
requiresLegacyRawKeyedStateSnapshots |= keyedStateBackend instanceof
AsyncKeyedStateBackend;
```
Or should subclasses like `AsyncKeyedStateBackendAdaptor` override this
method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]