Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83861268
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
    @@ -390,4 +425,141 @@ public void close() {
                        output.close();
                }
        }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Watermark handling
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Returns a {@link InternalTimerService} that can be used to query 
current processing time
    +    * and event time and to set timers. An operator can have several timer 
services, where
    +    * each has its own namespace serializer. Timer services are 
differentiated by the string
    +    * key that is given when requesting them, if you call this method with 
the same key
    +    * multiple times you will get the same timer service instance in 
subsequent requests.
    +    *
    +    * <p>Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
    +    * When a timer fires, this key will also be set as the currently 
active key.
    +    *
    +    * <p>Each timer has attached metadata, the namespace. Different timer 
services
    +    * can have a different namespace type. If you don't need namespace 
differentiation you
    +    * can use {@link VoidNamespaceSerializer} as the namespace serializer.
    +    *
    +    * @param name The name of the requested timer service. If no service 
exists under the given
    +    *             name a new one will be created and returned.
    +    * @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
    +    * @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
    +    * @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
    +    *
    +    * @param <K> The type of the timer keys.
    +    * @param <N> The type of the timer namespace.
    +    */
    +   public <K, N> InternalTimerService<N> getInternalTimerService(
    +                   String name,
    +                   TypeSerializer<K> keySerializer,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   Triggerable<K, N> triggerable) {
    +
    +           @SuppressWarnings("unchecked")
    +           HeapInternalTimerService<K, N> service = 
(HeapInternalTimerService<K, N>) timerServices.get(name);
    +
    +           if (service == null) {
    +                   if (restoredServices != null && 
restoredServices.containsKey(name)) {
    +                           @SuppressWarnings("unchecked")
    +                           HeapInternalTimerService.RestoredTimers<K, N> 
restoredService =
    --- End diff --
    
    `contains()` + `remove()` seems a bit redundant for this use. I would just 
always remove and check the return value for null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to