Hi,

If you have modified RecordWriter#randomEmit then maybe (probably?) the reason 
is that you are accidentally skipping LatencyMarkers along side records. You 
can track the code path of emitting LatencyMarkers from 
Output#emitLatencyMarker.

I haven’t thought that through, but maybe you should implement your record 
shedding on a level of RecordWriterOutput (or all implementations of the 
org.apache.flink.streaming.api.operators.Output?), because it’s easier there to 
differentiate between normal records and LatencyMarkers.

Piotrek

> On 28 Mar 2018, at 11:44, Luis Alves <lmtjal...@gmail.com> wrote:
> 
> Hi,
> 
> As part of a project that I'm developing, I'm extending Flink 1.2 to
> support load shedding. I'm doing some performance tests to check the
> performance impact of my changes compared to Flink 1.2 release.
> 
> From the results that I'm getting, I can see that load shedding is working
> and that incoming events are being dropped (the lag in the input Kafka
> topic also remains ~0).
> 
> But when I look at the latency in Flink, it seems that when load shedding
> triggers, the latency starts  growing to values above 5 seconds (I don't
> see the same behavior on Flink 1.2. release). Before load shedding
> triggers, the latency remains similar.
> 
> Looking at the git diff with the changes that I did on the application
> runtime side, there's only one that is in the critical path of the
> processing pipeline. In the RecordWriter.emit
> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179d7fb2280876be038c5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
> I simply add a condition that randomly skips the step of sending the record
> to the target (sendToTarget) with a given probability (dropProbability <
> random.nextDouble()).
> 
> Does adding those random drops have side effects on some other component in
> Flink, causing the latencies to increase?
> 
> Thanks,
> Luís Alves

Reply via email to