I could, but the objective is to have minimal changes on the user level.

Luís Alves

2018-03-28 16:45 GMT+01:00 Piotr Nowojski <pi...@data-artisans.com>:

> Yes, they are using randomEmit, so if you didn’t add this randomised
> records dropping in it, my remark would be invalid (and consistent with
> what Chesnay wrote).
>
> Besides questions asked by Chesnay, wouldn’t it be safer to implement
> records shedding on a user level in a form of randomly filtering operator?
>
> Piotrek
>
> > On 28 Mar 2018, at 15:49, Luis Alves <lmtjal...@gmail.com> wrote:
> >
> > @Chesnay I tried both approaches, using the latency metric and manually
> by
> > adding a timestamp to each record.
> > @Piotr I can try to do the random drops in the RecordWriterOutput, but
> > don't the latency markers use the randomEmit method instead of emit?
> >
> > 2018-03-28 14:26 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
> >
> >> My first instinct were latency markers as well, but AFAIK latency
> markers
> >> are self-contained; they contain the start timestamp from the source
> and we
> >> just measure the diff at each task. Thus, if the marker is dropped it
> >> shouldn't be visible in increased latency metrics, but they should just
> not
> >> be updated instead.
> >>
> >> How do you measure the latency to being with? Do you use the latency
> >> metric or do you measure it manually?
> >> If it is done manually couldn't this be due by dropping watermarks,
> >> causing window operations to take longer than expected?
> >>
> >>
> >> On 28.03.2018 14:39, Piotr Nowojski wrote:
> >>
> >>> Hi,
> >>>
> >>> If you have modified RecordWriter#randomEmit then maybe (probably?) the
> >>> reason is that you are accidentally skipping LatencyMarkers along side
> >>> records. You can track the code path of emitting LatencyMarkers from
> >>> Output#emitLatencyMarker.
> >>>
> >>> I haven’t thought that through, but maybe you should implement your
> >>> record shedding on a level of RecordWriterOutput (or all
> implementations of
> >>> the org.apache.flink.streaming.api.operators.Output?), because it’s
> >>> easier there to differentiate between normal records and
> LatencyMarkers.
> >>>
> >>> Piotrek
> >>>
> >>> On 28 Mar 2018, at 11:44, Luis Alves <lmtjal...@gmail.com> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> As part of a project that I'm developing, I'm extending Flink 1.2 to
> >>>> support load shedding. I'm doing some performance tests to check the
> >>>> performance impact of my changes compared to Flink 1.2 release.
> >>>>
> >>>> From the results that I'm getting, I can see that load shedding is
> >>>> working
> >>>> and that incoming events are being dropped (the lag in the input Kafka
> >>>> topic also remains ~0).
> >>>>
> >>>> But when I look at the latency in Flink, it seems that when load
> shedding
> >>>> triggers, the latency starts  growing to values above 5 seconds (I
> don't
> >>>> see the same behavior on Flink 1.2. release). Before load shedding
> >>>> triggers, the latency remains similar.
> >>>>
> >>>> Looking at the git diff with the changes that I did on the application
> >>>> runtime side, there's only one that is in the critical path of the
> >>>> processing pipeline. In the RecordWriter.emit
> >>>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179
> >>>> d7fb2280876be038c5/flink-runtime/src/main/java/org/
> >>>> apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
> >>>> I simply add a condition that randomly skips the step of sending the
> >>>> record
> >>>> to the target (sendToTarget) with a given probability
> (dropProbability <
> >>>> random.nextDouble()).
> >>>>
> >>>> Does adding those random drops have side effects on some other
> component
> >>>> in
> >>>> Flink, causing the latencies to increase?
> >>>>
> >>>> Thanks,
> >>>> Luís Alves
> >>>>
> >>>
> >>>
> >>
>
>

Reply via email to