ASF GitHub Bot commented on FLINK-3660:

Github user rmetzger commented on a diff in the pull request:

    --- Diff: 
    @@ -273,7 +276,7 @@ public void releaseOutputs() {
                // now create the operator and give it the output collector to 
write its output to
                OneInputStreamOperator<IN, OUT> chainedOperator = 
    -           chainedOperator.setup(containingTask, operatorConfig, output);
    +           chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
    --- End diff --
    The reason why we still differentiate between those is that we don't want 
to keep the latency statistics for all parallel source instances at the 
intermediate operators.
    So in the current implementation:
    - Intermediate operators maintain latency statistics for each logical 
source (so not much data)
    - Sinks maintain latency statistics for all parallel instances of each 
logical source (a bit more data)
    The sinks maintain the additional data to allow users debugging latency 
issues with individual parallel instances (for example when a machine running a 
source has issues).

> Measure latency of elements and expose it through web interface
> ---------------------------------------------------------------
>                 Key: FLINK-3660
>                 URL: https://issues.apache.org/jira/browse/FLINK-3660
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>             Fix For: pre-apache
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.

This message was sent by Atlassian JIRA

Reply via email to