Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6062#discussion_r191413234
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
---
@@ -301,114 +259,39 @@ public void advanceWatermark(long time) throws
Exception {
keySerializer.snapshotConfiguration(),
namespaceSerializer,
namespaceSerializer.snapshotConfiguration(),
- getEventTimeTimerSetForKeyGroup(keyGroupIdx),
-
getProcessingTimeTimerSetForKeyGroup(keyGroupIdx));
+
eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
+
processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
}
/**
* Restore the timers (both processing and event time ones) for a given
{@code keyGroupIdx}.
*
- * @param restoredTimersSnapshot the restored snapshot containing the
key-group's timers,
+ * @param restoredSnapshot the restored snapshot containing the
key-group's timers,
* and the serializers that were used to write
them
* @param keyGroupIdx the id of the key-group to be put in the snapshot.
*/
@SuppressWarnings("unchecked")
- public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?>
restoredTimersSnapshot, int keyGroupIdx) throws IOException {
- this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>)
restoredTimersSnapshot;
+ public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?>
restoredSnapshot, int keyGroupIdx) {
+ this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>)
restoredSnapshot;
- if ((this.keyDeserializer != null &&
!this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) ||
- (this.namespaceDeserializer != null &&
!this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer())))
{
+ if ((this.keyDeserializer != null &&
!this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
--- End diff --
This check could be factored out into a method with a meaningful and easy
to understand name, e.g. `checkSerializerCompatibility`.
---