I could, but the objective is to have minimal changes on the user level. Luís Alves
2018-03-28 16:45 GMT+01:00 Piotr Nowojski <pi...@data-artisans.com>: > Yes, they are using randomEmit, so if you didn’t add this randomised > records dropping in it, my remark would be invalid (and consistent with > what Chesnay wrote). > > Besides questions asked by Chesnay, wouldn’t it be safer to implement > records shedding on a user level in a form of randomly filtering operator? > > Piotrek > > > On 28 Mar 2018, at 15:49, Luis Alves <lmtjal...@gmail.com> wrote: > > > > @Chesnay I tried both approaches, using the latency metric and manually > by > > adding a timestamp to each record. > > @Piotr I can try to do the random drops in the RecordWriterOutput, but > > don't the latency markers use the randomEmit method instead of emit? > > > > 2018-03-28 14:26 GMT+01:00 Chesnay Schepler <ches...@apache.org>: > > > >> My first instinct were latency markers as well, but AFAIK latency > markers > >> are self-contained; they contain the start timestamp from the source > and we > >> just measure the diff at each task. Thus, if the marker is dropped it > >> shouldn't be visible in increased latency metrics, but they should just > not > >> be updated instead. > >> > >> How do you measure the latency to being with? Do you use the latency > >> metric or do you measure it manually? > >> If it is done manually couldn't this be due by dropping watermarks, > >> causing window operations to take longer than expected? > >> > >> > >> On 28.03.2018 14:39, Piotr Nowojski wrote: > >> > >>> Hi, > >>> > >>> If you have modified RecordWriter#randomEmit then maybe (probably?) the > >>> reason is that you are accidentally skipping LatencyMarkers along side > >>> records. You can track the code path of emitting LatencyMarkers from > >>> Output#emitLatencyMarker. > >>> > >>> I haven’t thought that through, but maybe you should implement your > >>> record shedding on a level of RecordWriterOutput (or all > implementations of > >>> the org.apache.flink.streaming.api.operators.Output?), because it’s > >>> easier there to differentiate between normal records and > LatencyMarkers. > >>> > >>> Piotrek > >>> > >>> On 28 Mar 2018, at 11:44, Luis Alves <lmtjal...@gmail.com> wrote: > >>>> > >>>> Hi, > >>>> > >>>> As part of a project that I'm developing, I'm extending Flink 1.2 to > >>>> support load shedding. I'm doing some performance tests to check the > >>>> performance impact of my changes compared to Flink 1.2 release. > >>>> > >>>> From the results that I'm getting, I can see that load shedding is > >>>> working > >>>> and that incoming events are being dropped (the lag in the input Kafka > >>>> topic also remains ~0). > >>>> > >>>> But when I look at the latency in Flink, it seems that when load > shedding > >>>> triggers, the latency starts growing to values above 5 seconds (I > don't > >>>> see the same behavior on Flink 1.2. release). Before load shedding > >>>> triggers, the latency remains similar. > >>>> > >>>> Looking at the git diff with the changes that I did on the application > >>>> runtime side, there's only one that is in the critical path of the > >>>> processing pipeline. In the RecordWriter.emit > >>>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179 > >>>> d7fb2280876be038c5/flink-runtime/src/main/java/org/ > >>>> apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105> > >>>> I simply add a condition that randomly skips the step of sending the > >>>> record > >>>> to the target (sendToTarget) with a given probability > (dropProbability < > >>>> random.nextDouble()). > >>>> > >>>> Does adding those random drops have side effects on some other > component > >>>> in > >>>> Flink, causing the latencies to increase? > >>>> > >>>> Thanks, > >>>> Luís Alves > >>>> > >>> > >>> > >> > >