@Chesnay I tried both approaches, using the latency metric and manually by
adding a timestamp to each record.
@Piotr I can try to do the random drops in the RecordWriterOutput, but
don't the latency markers use the randomEmit method instead of emit?

2018-03-28 14:26 GMT+01:00 Chesnay Schepler <ches...@apache.org>:

> My first instinct were latency markers as well, but AFAIK latency markers
> are self-contained; they contain the start timestamp from the source and we
> just measure the diff at each task. Thus, if the marker is dropped it
> shouldn't be visible in increased latency metrics, but they should just not
> be updated instead.
>
> How do you measure the latency to being with? Do you use the latency
> metric or do you measure it manually?
> If it is done manually couldn't this be due by dropping watermarks,
> causing window operations to take longer than expected?
>
>
> On 28.03.2018 14:39, Piotr Nowojski wrote:
>
>> Hi,
>>
>> If you have modified RecordWriter#randomEmit then maybe (probably?) the
>> reason is that you are accidentally skipping LatencyMarkers along side
>> records. You can track the code path of emitting LatencyMarkers from
>> Output#emitLatencyMarker.
>>
>> I haven’t thought that through, but maybe you should implement your
>> record shedding on a level of RecordWriterOutput (or all implementations of
>> the org.apache.flink.streaming.api.operators.Output?), because it’s
>> easier there to differentiate between normal records and LatencyMarkers.
>>
>> Piotrek
>>
>> On 28 Mar 2018, at 11:44, Luis Alves <lmtjal...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> As part of a project that I'm developing, I'm extending Flink 1.2 to
>>> support load shedding. I'm doing some performance tests to check the
>>> performance impact of my changes compared to Flink 1.2 release.
>>>
>>>  From the results that I'm getting, I can see that load shedding is
>>> working
>>> and that incoming events are being dropped (the lag in the input Kafka
>>> topic also remains ~0).
>>>
>>> But when I look at the latency in Flink, it seems that when load shedding
>>> triggers, the latency starts  growing to values above 5 seconds (I don't
>>> see the same behavior on Flink 1.2. release). Before load shedding
>>> triggers, the latency remains similar.
>>>
>>> Looking at the git diff with the changes that I did on the application
>>> runtime side, there's only one that is in the critical path of the
>>> processing pipeline. In the RecordWriter.emit
>>> <https://github.com/apache/flink/blob/066b66dd37030bb94bd179
>>> d7fb2280876be038c5/flink-runtime/src/main/java/org/
>>> apache/flink/runtime/io/network/api/writer/RecordWriter.java#L105>
>>> I simply add a condition that randomly skips the step of sending the
>>> record
>>> to the target (sendToTarget) with a given probability (dropProbability <
>>> random.nextDouble()).
>>>
>>> Does adding those random drops have side effects on some other component
>>> in
>>> Flink, causing the latencies to increase?
>>>
>>> Thanks,
>>> Luís Alves
>>>
>>
>>
>

Reply via email to