Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1819#discussion_r56798327
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/HistogramBasedWatermarkEmitter.java
---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * When processing a stream, and for the lateness to be as low as
possible, the watermark should
+ * be as close to processing time as possible. Unfortunately, in streams
with late data, i.e.
+ * data with timestamps smaller than the last received watermark, this
means that a number of elements
+ * will be discarded due to lateness. In order to avoid this, this
extractor periodically samples the
+ * timestamps of the elements in a stream and keeps a histogram of the
observed lateness. Based on this
+ * histogram, it sets the watermark lateness to the lowest possible value
that, at the same time,
+ * guarantees that a user-specified percentage of the elements in the
stream are covered and not
+ * dropped due to lateness.
+ *
+ * <p>More precisely, the user specifies i) the duration of the sampling
period, ii) that of the interval
+ * between the end of a sampling period and the start of the next one, and
iii) the percentage referring
+ * of elements in the stream (late and non-late) that she wants to be
covered, i.e. considered non-late.
+ * Given this information, <b>during the sampling period</b> the extractor
keeps a per-second lateness
+ * histogram, i.e. a histogram showing how may elements were 0, 1, 2...
seconds late, and the maximum
+ * (event-time) timestamp seen so far. When <b>the sampling period
ends</b>, it computes the minimum
+ * lateness that covers the user-specified percentage of data, and
whenever a watermark is emitted, its
+ * timestamp is the maximum (event-time) timestamp seen up to that point
in the stream, minus the
+ * previously computed value. This value is not updated till the
<b>end</b> of the next sampling period.
+ * */
+public abstract class HistogramBasedWatermarkEmitter<T> extends
AbstractRichFunction
+ implements AssignerWithPeriodicWatermarks<T>, Triggerable {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean testing;
+
+ /**
+ * A map holding the histogram. Currently we assume
+ * buckets of 2^10 milliseconds, which is roughly a second.
--- End diff --
Shouldn't that highly depend on the application?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---