Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106670490
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be 
scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements 
ProcessingTimeCallback, EventTimeCallback {
    +
    +   protected final ProcessingTimeService processingTimeService;
    +
    +   protected final KeyContext keyContext;
    +
    +   protected final int totalKeyGroups;
    +
    +   protected final KeyGroupRange keyGroupRange;
    +
    +   /**
    +    * The one and only Future (if any) registered to execute the
    +    * next {@link Triggerable} action, when its (processing) time arrives.
    +    */
    +   protected ScheduledFuture<?> nextTimer;
    +
    +   /**
    +    * The local event time, as denoted by the last received
    +    * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
    +    */
    +   private long currentWatermark = Long.MIN_VALUE;
    +
    +   // Variables to be set when the service is started.
    +
    +   protected TypeSerializer<K> keySerializer;
    +
    +   protected TypeSerializer<N> namespaceSerializer;
    +
    +   private InternalTimer.TimerSerializer<K, N> timerSerializer;
    +
    +   protected Triggerable<K, N> triggerTarget;
    +
    +   private volatile boolean isInitialized;
    +
    +   public InternalTimerService(
    +                   int totalKeyGroups, 
    +                   KeyGroupRange keyGroupRange, 
    +                   KeyContext keyContext, 
    +                   ProcessingTimeService processingTimeService) {
    +           
    +           this.totalKeyGroups = totalKeyGroups;
    +           this.keyGroupRange = checkNotNull(keyGroupRange);
    +           this.keyContext = checkNotNull(keyContext);
    +           this.processingTimeService = 
checkNotNull(processingTimeService);
    +   }
     
        /** Returns the current processing time. */
    -   long currentProcessingTime();
    +   public long currentProcessingTime() {
    +           return processingTimeService.getCurrentProcessingTime();
    +   }
     
        /** Returns the current event-time watermark. */
    -   long currentWatermark();
    +   public long currentWatermark() {
    +           return currentWatermark;
    +   }
     
        /**
         * Registers a timer to be fired when processing time passes the given 
time. The namespace
         * you pass here will be provided when the timer fires.
         */
    -   void registerProcessingTimeTimer(N namespace, long time);
    +   abstract public void registerProcessingTimeTimer(N namespace, long 
time);
     
        /**
         * Deletes the timer for the given key and namespace.
         */
    -   void deleteProcessingTimeTimer(N namespace, long time);
    +   abstract public void deleteProcessingTimeTimer(N namespace, long time);
     
        /**
         * Registers a timer to be fired when processing time passes the given 
time. The namespace
         * you pass here will be provided when the timer fires.
         */
    -   void registerEventTimeTimer(N namespace, long time);
    +   abstract public void registerEventTimeTimer(N namespace, long time);
     
        /**
         * Deletes the timer for the given key and namespace.
         */
    -   void deleteEventTimeTimer(N namespace, long time);
    +   abstract public void deleteEventTimeTimer(N namespace, long time);
    +
    +   /**
    +    * Returns the timers for the given key group.
    +    */
    +   abstract public Set<InternalTimer<K, N>> 
getEventTimeTimersForKeyGroup(int keyGroup);
    +
    +   /**
    +    * Returns the timers for the given key group.
    +    */
    +   abstract public Set<InternalTimer<K, N>> 
getProcessingTimeTimersForKeyGroup(int keyGroup);
    +
    +   /**
    +    * Restores the timers for the given key group.
    +    */
    +   abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, 
Iterable<InternalTimer<K, N>> timers);
    +
    +   /**
    +    * Restores the timers for the given key group.
    +    */
    +   abstract public void restoreProcessingTimeTimersForKeyGroup(int 
keyGroup, Iterable<InternalTimer<K, N>> timers);
    +
    +   /**
    +    * Starts the execution of the timer service
    +    */
    +   abstract public void start();
    +
    +   /**
    +    * Closes the timer service.
    +    */
    +   abstract public void close();
    +   
    +   public void advanceWatermark(long watermark) throws Exception {
    +           if (watermark < currentWatermark) {
    +                   throw new IllegalStateException("The watermark is 
late.");
    +           }
    +           
    +           currentWatermark = watermark;
    +           
    +           onEventTime(watermark);
    +   }
    +
    +   /**
    +    * Snapshots the timers (both processing and event time ones) for a 
given {@code keyGroupIdx}.
    +    * @param stream the stream to write to.
    +    * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +    */
    +   public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper 
stream, int keyGroupIdx) throws Exception {
    --- End diff --
    
    I think this does not require public visibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to