[
https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493599#comment-16493599
]
ASF GitHub Bot commented on FLINK-9423:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6062#discussion_r191413234
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
---
@@ -301,114 +259,39 @@ public void advanceWatermark(long time) throws
Exception {
keySerializer.snapshotConfiguration(),
namespaceSerializer,
namespaceSerializer.snapshotConfiguration(),
- getEventTimeTimerSetForKeyGroup(keyGroupIdx),
-
getProcessingTimeTimerSetForKeyGroup(keyGroupIdx));
+
eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
+
processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
}
/**
* Restore the timers (both processing and event time ones) for a given
{@code keyGroupIdx}.
*
- * @param restoredTimersSnapshot the restored snapshot containing the
key-group's timers,
+ * @param restoredSnapshot the restored snapshot containing the
key-group's timers,
* and the serializers that were used to write
them
* @param keyGroupIdx the id of the key-group to be put in the snapshot.
*/
@SuppressWarnings("unchecked")
- public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?>
restoredTimersSnapshot, int keyGroupIdx) throws IOException {
- this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>)
restoredTimersSnapshot;
+ public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?>
restoredSnapshot, int keyGroupIdx) {
+ this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>)
restoredSnapshot;
- if ((this.keyDeserializer != null &&
!this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) ||
- (this.namespaceDeserializer != null &&
!this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer())))
{
+ if ((this.keyDeserializer != null &&
!this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
--- End diff --
This check could be factored out into a method with a meaningful and easy
to understand name, e.g. `checkSerializerCompatibility`.
> Implement efficient deletes for heap based timer service
> --------------------------------------------------------
>
> Key: FLINK-9423
> URL: https://issues.apache.org/jira/browse/FLINK-9423
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.5.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to
> support efficient timer deletes, the complexity is currently O\(n\), where n
> is the number of registered timers.
>
> We can keep track of timer's positions in the priority queue and (in
> combination with the already existing set/map) have a more efficient
> algorithm for deletes.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)