@Chesnay I tried both approaches, using the latency metric and manually by adding a timestamp to each record. @Piotr I can try to do the random drops in the RecordWriterOutput, but don't the latency markers use the randomEmit method instead of emit?
2018-03-28 14:26 GMT+01:00 Chesnay Schepler <ches...@apache.org>: > My first instinct were latency markers as well, but AFAIK latency markers > are self-contained; they contain the start timestamp from the source and we > just measure the diff at each task. Thus, if the marker is dropped it > shouldn't be visible in increased latency metrics, but they should just not > be updated instead. > > How do you measure the latency to being with? Do you use the latency > metric or do you measure it manually? > If it is done manually couldn't this be due by dropping watermarks, > causing window operations to take longer than expected? > > > On 28.03.2018 14:39, Piotr Nowojski wrote: > >> Hi, >> >> If you have modified RecordWriter#randomEmit then maybe (probably?) the >> reason is that you are accidentally skipping LatencyMarkers along side >> records. You can track the code path of emitting LatencyMarkers from >> Output#emitLatencyMarker. >> >> I haven’t thought that through, but maybe you should implement your >> record shedding on a level of RecordWriterOutput (or all implementations of >> the org.apache.flink.streaming.api.operators.Output?), because it’s >> easier there to differentiate between normal records and LatencyMarkers. >> >> Piotrek >> >> On 28 Mar 2018, at 11:44, Luis Alves <lmtjal...@gmail.com> wrote: >>> >>> Hi, >>> >>> As part of a project that I'm developing, I'm extending Flink 1.2 to >>> support load shedding. I'm doing some performance tests to check the >>> performance impact of my changes compared to Flink 1.2 release. >>> >>> From the results that I'm getting, I can see that load shedding is >>> working >>> and that incoming events are being dropped (the lag in the input Kafka >>> topic also remains ~0). >>> >>> But when I look at the latency in Flink, it seems that when load shedding >>> triggers, the latency starts growing to values above 5 seconds (I don't >>> see the same behavior on Flink 1.2. release). Before load shedding >>> triggers, the latency remains similar. >>> >>> Looking at the git diff with the changes that I did on the application >>> runtime side, there's only one that is in the critical path of the >>> processing pipeline. In the RecordWriter.emit >>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179 >>> d7fb2280876be038c5/flink-runtime/src/main/java/org/ >>> apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105> >>> I simply add a condition that randomly skips the step of sending the >>> record >>> to the target (sendToTarget) with a given probability (dropProbability < >>> random.nextDouble()). >>> >>> Does adding those random drops have side effects on some other component >>> in >>> Flink, causing the latencies to increase? >>> >>> Thanks, >>> Luís Alves >>> >> >> >