Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1819#discussion_r56798327
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/HistogramBasedWatermarkEmitter.java
 ---
    @@ -0,0 +1,263 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.api.common.functions.AbstractRichFunction;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.runtime.operators.Triggerable;
    +
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +/**
    + * When processing a stream, and for the lateness to be as low as 
possible, the watermark should
    + * be as close to processing time as possible. Unfortunately, in streams 
with late data, i.e.
    + * data with timestamps smaller than the last received watermark, this 
means that a number of elements
    + * will be discarded due to lateness. In order to avoid this, this 
extractor periodically samples the
    + * timestamps of the elements in a stream and keeps a histogram of the 
observed lateness. Based on this
    + * histogram, it sets the watermark lateness to the lowest possible value 
that, at the same time,
    + * guarantees that a user-specified percentage of the elements in the 
stream are covered and not
    + * dropped due to lateness.
    + *
    + * <p>More precisely, the user specifies i) the duration of the sampling 
period, ii) that of the interval
    + * between the end of a sampling period and the start of the next one, and 
iii) the percentage referring
    + * of elements in the stream (late and non-late) that she wants to be 
covered, i.e. considered non-late.
    + * Given this information, <b>during the sampling period</b> the extractor 
keeps a per-second lateness
    + * histogram, i.e. a histogram showing how may elements were 0, 1, 2... 
seconds late, and the maximum
    + * (event-time) timestamp seen so far. When <b>the sampling period 
ends</b>, it computes the minimum
    + * lateness that covers the user-specified percentage of data, and 
whenever a watermark is emitted, its
    + * timestamp is the maximum (event-time) timestamp seen up to that point 
in the stream, minus the
    + * previously computed value. This value is not updated till the 
<b>end</b> of the next sampling period.
    + * */
    +public abstract class HistogramBasedWatermarkEmitter<T> extends 
AbstractRichFunction
    +   implements AssignerWithPeriodicWatermarks<T>, Triggerable {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   private boolean testing;
    +
    +   /**
    +    * A map holding the histogram. Currently we assume
    +    * buckets of 2^10 milliseconds, which is roughly a second.
    --- End diff --
    
    Shouldn't that highly depend on the application?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to