Hi, If you have modified RecordWriter#randomEmit then maybe (probably?) the reason is that you are accidentally skipping LatencyMarkers along side records. You can track the code path of emitting LatencyMarkers from Output#emitLatencyMarker.
I haven’t thought that through, but maybe you should implement your record shedding on a level of RecordWriterOutput (or all implementations of the org.apache.flink.streaming.api.operators.Output?), because it’s easier there to differentiate between normal records and LatencyMarkers. Piotrek > On 28 Mar 2018, at 11:44, Luis Alves <lmtjal...@gmail.com> wrote: > > Hi, > > As part of a project that I'm developing, I'm extending Flink 1.2 to > support load shedding. I'm doing some performance tests to check the > performance impact of my changes compared to Flink 1.2 release. > > From the results that I'm getting, I can see that load shedding is working > and that incoming events are being dropped (the lag in the input Kafka > topic also remains ~0). > > But when I look at the latency in Flink, it seems that when load shedding > triggers, the latency starts growing to values above 5 seconds (I don't > see the same behavior on Flink 1.2. release). Before load shedding > triggers, the latency remains similar. > > Looking at the git diff with the changes that I did on the application > runtime side, there's only one that is in the critical path of the > processing pipeline. In the RecordWriter.emit > <https://github.com/apache/flink/blob/066b66dd37030bb94bd179d7fb2280876be038c5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105> > I simply add a condition that randomly skips the step of sending the record > to the target (sendToTarget) with a given probability (dropProbability < > random.nextDouble()). > > Does adding those random drops have side effects on some other component in > Flink, causing the latencies to increase? > > Thanks, > Luís Alves