[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585555#comment-15585555
 ] 

ASF GitHub Bot commented on FLINK-3674:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83859446
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
    @@ -0,0 +1,325 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.operators.Triggerable;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>, Triggerable {
    +
    +   private final TypeSerializer<K> keySerializer;
    +
    +   private final TypeSerializer<N> namespaceSerializer;
    +
    +   private final ProcessingTimeService processingTimeService;
    +
    +   private long currentWatermark = Long.MIN_VALUE;
    +
    +   private final org.apache.flink.streaming.api.operators.Triggerable<K, 
N> triggerTarget;
    +
    +   private final KeyContext keyContext;
    +
    +   /**
    +    * Processing time timers that are currently in-flight.
    +    */
    +   private final PriorityQueue<InternalTimer<K, N>> 
processingTimeTimersQueue;
    +   private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +   protected ScheduledFuture<?> nextTimer = null;
    +
    +   /**
    +    * Currently waiting watermark callbacks.
    +    */
    +   private final Set<InternalTimer<K, N>> watermarkTimers;
    +   private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +   public HeapInternalTimerService(
    +                   TypeSerializer<K> keySerializer,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   org.apache.flink.streaming.api.operators.Triggerable<K, 
N> triggerTarget,
    +                   KeyContext keyContext,
    +                   ProcessingTimeService processingTimeService) {
    +           this.keySerializer = checkNotNull(keySerializer);
    +           this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +           this.triggerTarget = checkNotNull(triggerTarget);
    +           this.keyContext = keyContext;
    +           this.processingTimeService = 
checkNotNull(processingTimeService);
    +
    +           watermarkTimers = new HashSet<>();
    +           watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +           processingTimeTimers = new HashSet<>();
    +           processingTimeTimersQueue = new PriorityQueue<>(100);
    +   }
    +
    +   public HeapInternalTimerService(
    +                   TypeSerializer<K> keySerializer,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   org.apache.flink.streaming.api.operators.Triggerable<K, 
N> triggerTarget,
    +                   KeyContext keyContext,
    +                   ProcessingTimeService processingTimeService,
    +                   RestoredTimers<K, N> restoredTimers) {
    +
    +           this.keySerializer = checkNotNull(keySerializer);
    +           this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +           this.triggerTarget = checkNotNull(triggerTarget);
    +           this.keyContext = keyContext;
    +           this.processingTimeService = 
checkNotNull(processingTimeService);
    +
    --- End diff --
    
    RestoredTimers are serialized with their typeserializers. It could make 
sense to have some equals or compatibility check here against the passed type 
serializers. Also wonder if they need to be serialized in the first place.


> Add an interface for Time aware User Functions
> ----------------------------------------------
>
>                 Key: FLINK-3674
>                 URL: https://issues.apache.org/jira/browse/FLINK-3674
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Stephan Ewen
>            Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
>     void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction<String, String>, 
> EventTimeFunction {
>     private long currentEventTime = Long.MIN_VALUE;
>     public String map(String value) {
>         return value + " @ " + currentEventTime;
>     }
>     public void onWatermark(Watermark watermark) {
>         currentEventTime = watermark.getTimestamp();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to