Zakelly commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1805992325
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java:
##########
@@ -241,33 +242,40 @@ public StreamOperatorStateContext
streamOperatorStateContext(
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
+ // if the operator indicates that it is using custom raw keyed
state,
+ // then whatever was written in the raw keyed state snapshot was
NOT written
+ // by the internal timer services (because there is only ever one
user of raw keyed
+ // state);
+ // in this case, timers should not attempt to restore timers from
the raw keyed
+ // state.
+ final Iterable<KeyGroupStatePartitionStreamProvider>
restoredRawKeyedStateTimers =
+ (prioritizedOperatorSubtaskStates.isRestored() &&
!isUsingCustomRawKeyedState)
+ ? rawKeyedStateInputs
+ : Collections.emptyList();
if (keyedStatedBackend != null) {
-
- // if the operator indicates that it is using custom raw keyed
state,
- // then whatever was written in the raw keyed state snapshot
was NOT written
- // by the internal timer services (because there is only ever
one user of raw keyed
- // state);
- // in this case, timers should not attempt to restore timers
from the raw keyed
- // state.
- final Iterable<KeyGroupStatePartitionStreamProvider>
restoredRawKeyedStateTimers =
- (prioritizedOperatorSubtaskStates.isRestored()
- && !isUsingCustomRawKeyedState)
- ? rawKeyedStateInputs
- : Collections.emptyList();
-
timeServiceManager =
timeServiceManagerProvider.create(
environment.getMetricGroup().getIOMetricGroup(),
keyedStatedBackend,
+ keyedStatedBackend.getKeyGroupRange(),
+
environment.getUserCodeClassLoader().asClassLoader(),
+ keyContext,
+ processingTimeService,
+ restoredRawKeyedStateTimers,
+ cancellationContext);
+ }
+ if (asyncKeyedStateBackend != null) {
Review Comment:
How about this:
```
if (stateBackend.supportsAsyncKeyedStateBackend()) {
asyncTimeServiceManager =
timeServiceManagerProvider.create(xxxxxxx);
} else {
asyncTimeServiceManager = timeServiceManager;
}
```
In case `AsyncKeyedStateBackendAdaptor` is used, there is no need to create
another `timeServiceManager`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]