dawidwys commented on a change in pull request #13405:
URL: https://github.com/apache/flink/pull/13405#discussion_r491971571
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
##########
@@ -74,19 +74,16 @@
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
- private final boolean useLegacySynchronousSnapshots;
-
InternalTimeServiceManager(
- KeyGroupRange localKeyGroupRange,
- KeyContext keyContext,
- PriorityQueueSetFactory priorityQueueSetFactory,
- ProcessingTimeService processingTimeService, boolean
useLegacySynchronousSnapshots) {
Review comment:
It can. Let me explain it though, why I think it is unnecessary in the
ctor. (There is one thing I forgot in the PR, I wanted to make the
`snapshotStateForKeyGroup` private. It is used only in the
`InternalTimeServiceManager#snapshotState`.)
The flag is used only in the `snapshotStateForKeyGroup` which is used only
in the `snapshotState` method. The `snapshotState` method looks like:
```
public void snapshotState(
KeyedStateBackend<?> keyedStateBackend,
StateSnapshotContext context,
String operatorName) throws Exception {
//TODO all of this can be removed once heap-based timers are
integrated with RocksDB incremental snapshots
if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
((AbstractKeyedStateBackend<?>)
keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
....
snapshotStateForKeyGroup(new
DataOutputViewStreamWrapper(out), keyGroupIdx);
....
}
}
```
Effectively this flag is checked twice. Once in the preconditions against
the value in ctor and the second time in the `snapshotState` method against the
value set in the `KeyedStateBackend`. In my opinion the one time in the
`snapshotState` is enough.
The only case when the condition in `snapshotStateForKeyGroup` could fail is
if we created the `InternalTimeServiceManager` from a different
`KeyedStateBackend` than we use for snapshotting. I am quite sure this would
let to way more problems than only this precondition failling.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
##########
@@ -74,19 +74,16 @@
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
- private final boolean useLegacySynchronousSnapshots;
-
InternalTimeServiceManager(
- KeyGroupRange localKeyGroupRange,
- KeyContext keyContext,
- PriorityQueueSetFactory priorityQueueSetFactory,
- ProcessingTimeService processingTimeService, boolean
useLegacySynchronousSnapshots) {
Review comment:
It can. Let me explain it though, why I think it is unnecessary in the
ctor. (There is one thing I forgot in the PR, I wanted to make the
`snapshotStateForKeyGroup` private. It is used only in the
`InternalTimeServiceManager#snapshotState`.)
The flag is used only in the `snapshotStateForKeyGroup` which is used only
in the `snapshotState` method. The `snapshotState` method looks like:
```
public void snapshotState(
KeyedStateBackend<?> keyedStateBackend,
StateSnapshotContext context,
String operatorName) throws Exception {
//TODO all of this can be removed once heap-based timers are
integrated with RocksDB incremental snapshots
if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
((AbstractKeyedStateBackend<?>)
keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
....
snapshotStateForKeyGroup(new
DataOutputViewStreamWrapper(out), keyGroupIdx);
....
}
}
```
Effectively this flag is checked twice. Once in the preconditions against
the value passed in the ctor and the second time in the `snapshotState` method
against the value set in the passed `KeyedStateBackend`. In my opinion the one
time in the `snapshotState` is enough.
The only case when the condition in `snapshotStateForKeyGroup` could fail is
if we created the `InternalTimeServiceManager` from a different
`KeyedStateBackend` than we use for snapshotting. I am quite sure this would
let to way more problems than only this precondition failling.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
##########
@@ -74,19 +74,16 @@
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
- private final boolean useLegacySynchronousSnapshots;
-
InternalTimeServiceManager(
- KeyGroupRange localKeyGroupRange,
- KeyContext keyContext,
- PriorityQueueSetFactory priorityQueueSetFactory,
- ProcessingTimeService processingTimeService, boolean
useLegacySynchronousSnapshots) {
Review comment:
It can. Let me explain it though, why I think it is unnecessary in the
ctor. (There is one thing I forgot in the PR, I wanted to make the
`snapshotStateForKeyGroup` private. It is used only in the
`InternalTimeServiceManager#snapshotState`.)
The flag is used only in the `snapshotStateForKeyGroup` which is used only
in the `snapshotState` method. The `snapshotState` method looks like:
```
public void snapshotState(
KeyedStateBackend<?> keyedStateBackend,
StateSnapshotContext context,
String operatorName) throws Exception {
//TODO all of this can be removed once heap-based timers are
integrated with RocksDB incremental snapshots
if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
((AbstractKeyedStateBackend<?>)
keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
....
snapshotStateForKeyGroup(new
DataOutputViewStreamWrapper(out), keyGroupIdx);
....
}
}
```
Effectively this flag is checked twice. Once in the preconditions against
the value passed in the ctor and the second time in the `snapshotState` method
against the value set in the passed `KeyedStateBackend`. In my opinion the one
time in the `snapshotState` is enough.
The only case when the condition in `snapshotStateForKeyGroup` could fail is
if we created the `InternalTimeServiceManager` from a different
`KeyedStateBackend` than we use for snapshotting. I am quite sure this would
lead to way more problems than only this precondition failling.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
##########
@@ -282,8 +283,8 @@ public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
- @SuppressWarnings("unchecked,rawtypes")
- AbstractKeyedStateBackend rawBackend =
(AbstractKeyedStateBackend) keyedStateBackend;
+ @SuppressWarnings("rawtypes")
+ CheckpointableKeyedStateBackend rawBackend =
keyedStateBackend;
rawBackend.setCurrentKey(key);
Review comment:
In this line, we actually can not remove the "rawtypes" suppression. We
can remove it from the method annotation though. I will do that.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
##########
@@ -217,14 +218,14 @@ void snapshotState(
}
public void notifyCheckpointComplete(long checkpointId) throws
Exception {
- if (keyedStateBackend != null) {
-
keyedStateBackend.notifyCheckpointComplete(checkpointId);
+ if (keyedStateBackend != null && keyedStateBackend instanceof
CheckpointListener) {
Review comment:
Makes sense.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
##########
@@ -74,19 +74,16 @@
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
- private final boolean useLegacySynchronousSnapshots;
-
InternalTimeServiceManager(
- KeyGroupRange localKeyGroupRange,
- KeyContext keyContext,
- PriorityQueueSetFactory priorityQueueSetFactory,
- ProcessingTimeService processingTimeService, boolean
useLegacySynchronousSnapshots) {
Review comment:
I think I changed it the way you suggested @carp84
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]