[
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930166#comment-15930166
]
ASF GitHub Bot commented on FLINK-5544:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3359#discussion_r106669974
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
---
@@ -18,43 +18,306 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Interface for working with time and timers.
*
* <p>This is the internal version of {@link
org.apache.flink.streaming.api.TimerService}
* that allows to specify a key and a namespace to which timers should be
scoped.
*
+ * All d
+ *
+ * @param <K> Type of the keys in the stream
* @param <N> Type of the namespace to which timers are scoped.
*/
@Internal
-public interface InternalTimerService<N> {
+public abstract class InternalTimerService<K, N> implements
ProcessingTimeCallback, EventTimeCallback {
--- End diff --
I would suggest to still keep the old interface and rename this to
`AbstractInternalTimerService<K, N> implements InternalTimerService<N>`. Like
that, we don't need to introduce the generic parameter K in all places, which
is actually giving away some implementation detail (K is used only for a
member, not for the interface methods). I also like to keep the interface slim,
and probably not every code that deals with `InternalTimerService` has to see
all the methods, e.g. for snapshots.
> Implement Internal Timer Service in RocksDB
> -------------------------------------------
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is
> HeapInternalTimerService which stores all timers in memory. In the cases
> where the number of keys is very large, the timer service will cost too much
> memory. A implementation which stores timers in RocksDB seems good to deal
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because
> the timers are accessed in different ways. When timers are triggered, we need
> to access timers in the order of timestamp. But when performing checkpoints,
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of
> merge sorting. We can store timers in RocksDB with the format
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put
> together and are sorted.
> Then we can deploy an in-memory heap which keeps the first timer of each key
> group to get the next timer to trigger. When a key group's first timer is
> updated, we can efficiently update the heap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)