[
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930174#comment-15930174
]
ASF GitHub Bot commented on FLINK-5544:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3359#discussion_r106671203
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
---
@@ -18,43 +18,306 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Interface for working with time and timers.
*
* <p>This is the internal version of {@link
org.apache.flink.streaming.api.TimerService}
* that allows to specify a key and a namespace to which timers should be
scoped.
*
+ * All d
+ *
+ * @param <K> Type of the keys in the stream
* @param <N> Type of the namespace to which timers are scoped.
*/
@Internal
-public interface InternalTimerService<N> {
+public abstract class InternalTimerService<K, N> implements
ProcessingTimeCallback, EventTimeCallback {
+
+ protected final ProcessingTimeService processingTimeService;
+
+ protected final KeyContext keyContext;
+
+ protected final int totalKeyGroups;
+
+ protected final KeyGroupRange keyGroupRange;
+
+ /**
+ * The one and only Future (if any) registered to execute the
+ * next {@link Triggerable} action, when its (processing) time arrives.
+ */
+ protected ScheduledFuture<?> nextTimer;
+
+ /**
+ * The local event time, as denoted by the last received
+ * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
+ */
+ private long currentWatermark = Long.MIN_VALUE;
+
+ // Variables to be set when the service is started.
+
+ protected TypeSerializer<K> keySerializer;
+
+ protected TypeSerializer<N> namespaceSerializer;
+
+ private InternalTimer.TimerSerializer<K, N> timerSerializer;
+
+ protected Triggerable<K, N> triggerTarget;
+
+ private volatile boolean isInitialized;
+
+ public InternalTimerService(
+ int totalKeyGroups,
+ KeyGroupRange keyGroupRange,
+ KeyContext keyContext,
+ ProcessingTimeService processingTimeService) {
+
+ this.totalKeyGroups = totalKeyGroups;
+ this.keyGroupRange = checkNotNull(keyGroupRange);
+ this.keyContext = checkNotNull(keyContext);
+ this.processingTimeService =
checkNotNull(processingTimeService);
+ }
/** Returns the current processing time. */
- long currentProcessingTime();
+ public long currentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
/** Returns the current event-time watermark. */
- long currentWatermark();
+ public long currentWatermark() {
+ return currentWatermark;
+ }
/**
* Registers a timer to be fired when processing time passes the given
time. The namespace
* you pass here will be provided when the timer fires.
*/
- void registerProcessingTimeTimer(N namespace, long time);
+ abstract public void registerProcessingTimeTimer(N namespace, long
time);
/**
* Deletes the timer for the given key and namespace.
*/
- void deleteProcessingTimeTimer(N namespace, long time);
+ abstract public void deleteProcessingTimeTimer(N namespace, long time);
/**
* Registers a timer to be fired when processing time passes the given
time. The namespace
* you pass here will be provided when the timer fires.
*/
- void registerEventTimeTimer(N namespace, long time);
+ abstract public void registerEventTimeTimer(N namespace, long time);
/**
* Deletes the timer for the given key and namespace.
*/
- void deleteEventTimeTimer(N namespace, long time);
+ abstract public void deleteEventTimeTimer(N namespace, long time);
+
+ /**
+ * Returns the timers for the given key group.
+ */
+ abstract public Set<InternalTimer<K, N>>
getEventTimeTimersForKeyGroup(int keyGroup);
+
+ /**
+ * Returns the timers for the given key group.
+ */
+ abstract public Set<InternalTimer<K, N>>
getProcessingTimeTimersForKeyGroup(int keyGroup);
+
+ /**
+ * Restores the timers for the given key group.
+ */
+ abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup,
Iterable<InternalTimer<K, N>> timers);
+
+ /**
+ * Restores the timers for the given key group.
+ */
+ abstract public void restoreProcessingTimeTimersForKeyGroup(int
keyGroup, Iterable<InternalTimer<K, N>> timers);
+
+ /**
+ * Starts the execution of the timer service
+ */
+ abstract public void start();
+
+ /**
+ * Closes the timer service.
+ */
+ abstract public void close();
+
+ public void advanceWatermark(long watermark) throws Exception {
+ if (watermark < currentWatermark) {
+ throw new IllegalStateException("The watermark is
late.");
+ }
+
+ currentWatermark = watermark;
+
+ onEventTime(watermark);
+ }
+
+ /**
+ * Snapshots the timers (both processing and event time ones) for a
given {@code keyGroupIdx}.
+ * @param stream the stream to write to.
+ * @param keyGroupIdx the id of the key-group to be put in the snapshot.
+ */
+ public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper
stream, int keyGroupIdx) throws Exception {
+ InstantiationUtil.serializeObject(stream, keySerializer);
+ InstantiationUtil.serializeObject(stream, namespaceSerializer);
+
+ // write the event time timers
+ Collection<InternalTimer<K, N>> eventTimers =
getEventTimeTimersForKeyGroup(keyGroupIdx);
+ if (eventTimers != null) {
+ stream.writeInt(eventTimers.size());
+ for (InternalTimer<K, N> timer : eventTimers) {
+ this.timerSerializer.serialize(timer, stream);
+ }
+ } else {
+ stream.writeInt(0);
+ }
+
+ // write the processing time timers
+ Collection<InternalTimer<K, N>> processingTimers =
getProcessingTimeTimersForKeyGroup(keyGroupIdx);
+ if (processingTimers != null) {
+ stream.writeInt(processingTimers.size());
+ for (InternalTimer<K, N> timer : processingTimers) {
+ this.timerSerializer.serialize(timer, stream);
+ }
+ } else {
+ stream.writeInt(0);
+ }
+ }
+
+ /**
+ * Restore the timers (both processing and event time ones) for a given
{@code keyGroupIdx}.
+ * @param stream the stream to read from.
+ * @param keyGroupIdx the id of the key-group to be put in the snapshot.
+ * @param userCodeClassLoader the class loader that will be used to
deserialize
+ * the
local key and namespace serializers.
+ */
+ public void restoreTimersForKeyGroup(DataInputViewStreamWrapper stream,
int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException,
ClassNotFoundException {
+ TypeSerializer<K> tmpKeySerializer =
InstantiationUtil.deserializeObject(stream, userCodeClassLoader);
+ TypeSerializer<N> tmpNamespaceSerializer =
InstantiationUtil.deserializeObject(stream, userCodeClassLoader);
+
+ if ((this.keySerializer != null &&
!this.keySerializer.equals(tmpKeySerializer)) ||
+ (this.namespaceSerializer != null &&
!this.namespaceSerializer.equals(tmpNamespaceSerializer))) {
+
+ throw new IllegalArgumentException("Tried to
restore timers " +
+ "for the same service with
different serializers.");
+ }
+
+ this.keySerializer = tmpKeySerializer;
+ this.namespaceSerializer = tmpNamespaceSerializer;
+
+ InternalTimer.TimerSerializer<K, N> timerSerializer =
+ new
InternalTimer.TimerSerializer<>(this.keySerializer, this.namespaceSerializer);
+
+ checkArgument(keyGroupRange.contains(keyGroupIdx),
+ "Key Group " + keyGroupIdx + " does not belong
to the local range.");
+
+ // read the event time timers
+ int sizeOfEventTimeTimers = stream.readInt();
+ if (sizeOfEventTimeTimers > 0) {
+ List<InternalTimer<K, N>> eventTimeTimers = new
ArrayList<>();
+ for (int i = 0; i < sizeOfEventTimeTimers; i++) {
+ InternalTimer<K, N> timer =
timerSerializer.deserialize(stream);
+
+ eventTimeTimers.add(timer);
+ }
+
+ restoreEventTimeTimersForKeyGroup(keyGroupIdx,
eventTimeTimers);
+ }
+
+ // read the processing time timers
+ int sizeOfProcessingTimeTimers = stream.readInt();
+ if (sizeOfProcessingTimeTimers > 0) {
+ List<InternalTimer<K, N>> processingTimeTimers = new
ArrayList<>();
+ for (int i = 0; i < sizeOfProcessingTimeTimers; i++) {
+ InternalTimer<K, N> timer =
timerSerializer.deserialize(stream);
+ processingTimeTimers.add(timer);
+ }
+
+ restoreProcessingTimeTimersForKeyGroup(keyGroupIdx,
processingTimeTimers);
+ }
+ }
+
+ /**
+ * Starts the local {@link InternalTimerService} by:
+ * <ol>
+ * <li>Setting the {@code keySerialized} and {@code
namespaceSerializer} for the timers it will contain.</li>
+ * <li>Setting the {@code triggerTarget} which contains the action
to be performed when a timer fires.</li>
+ * <li>Re-registering timers that were retrieved after recoveting
from a node failure, if any.</li>
+ * </ol>
+ * This method can be called multiple times, as long as it is called
with the same serializers.
+ */
+ void startTimerService(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ Triggerable<K, N> triggerTarget)
+ {
+
+ if (isInitialized) {
--- End diff --
Either `isInitialized` does not require to be volatile or this code is
potentially broken. If you need the thread-safety, I suggest replacing this
with `AtomicBoolean::compareAndSet(...)`.
> Implement Internal Timer Service in RocksDB
> -------------------------------------------
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is
> HeapInternalTimerService which stores all timers in memory. In the cases
> where the number of keys is very large, the timer service will cost too much
> memory. A implementation which stores timers in RocksDB seems good to deal
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because
> the timers are accessed in different ways. When timers are triggered, we need
> to access timers in the order of timestamp. But when performing checkpoints,
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of
> merge sorting. We can store timers in RocksDB with the format
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put
> together and are sorted.
> Then we can deploy an in-memory heap which keeps the first timer of each key
> group to get the next timer to trigger. When a key group's first timer is
> updated, we can efficiently update the heap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)