My first instinct were latency markers as well, but AFAIK latency
markers are self-contained; they contain the start timestamp from the
source and we just measure the diff at each task. Thus, if the marker is
dropped it shouldn't be visible in increased latency metrics, but they
should just not be updated instead.
How do you measure the latency to being with? Do you use the latency
metric or do you measure it manually?
If it is done manually couldn't this be due by dropping watermarks,
causing window operations to take longer than expected?
On 28.03.2018 14:39, Piotr Nowojski wrote:
Hi,
If you have modified RecordWriter#randomEmit then maybe (probably?) the reason
is that you are accidentally skipping LatencyMarkers along side records. You
can track the code path of emitting LatencyMarkers from
Output#emitLatencyMarker.
I haven’t thought that through, but maybe you should implement your record
shedding on a level of RecordWriterOutput (or all implementations of the
org.apache.flink.streaming.api.operators.Output?), because it’s easier there to
differentiate between normal records and LatencyMarkers.
Piotrek
On 28 Mar 2018, at 11:44, Luis Alves <lmtjal...@gmail.com> wrote:
Hi,
As part of a project that I'm developing, I'm extending Flink 1.2 to
support load shedding. I'm doing some performance tests to check the
performance impact of my changes compared to Flink 1.2 release.
From the results that I'm getting, I can see that load shedding is working
and that incoming events are being dropped (the lag in the input Kafka
topic also remains ~0).
But when I look at the latency in Flink, it seems that when load shedding
triggers, the latency starts growing to values above 5 seconds (I don't
see the same behavior on Flink 1.2. release). Before load shedding
triggers, the latency remains similar.
Looking at the git diff with the changes that I did on the application
runtime side, there's only one that is in the critical path of the
processing pipeline. In the RecordWriter.emit
<https://github.com/apache/flink/blob/066b66dd37030bb94bd179d7fb2280876be038c5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
I simply add a condition that randomly skips the step of sending the record
to the target (sendToTarget) with a given probability (dropProbability <
random.nextDouble()).
Does adding those random drops have side effects on some other component in
Flink, causing the latencies to increase?
Thanks,
Luís Alves