[
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473760#comment-15473760
]
ASF GitHub Bot commented on FLINK-3660:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2386#discussion_r77996891
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a
keyed stream. */
private transient KeyedStateBackend<?> keyedStateBackend;
- protected transient MetricGroup metrics;
+ // --------------- Metrics ---------------------------
+
+ /** Metric group for the operator */
+ protected MetricGroup metrics;
+
+ /** Flag indicating if this operator is a sink */
+ protected transient boolean isSink = false;
+
+ protected LatencyGauge latencyGauge;
+
//
------------------------------------------------------------------------
// Life Cycle
//
------------------------------------------------------------------------
@Override
- public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<OUT>> output) {
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<OUT>> output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName =
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
this.metrics =
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output,
this.metrics.counter("numRecordsOut"));
+ this.isSink = isSink;
+ Configuration taskManagerConfig =
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+ int historySize =
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE,
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+ if(historySize <= 0) {
+ LOG.warn("{} has been set to a value below 0: {}. Using
default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
+ historySize =
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+ }
+
+ latencyGauge = new LatencyGauge(this.metrics, historySize,
!isSink);
--- End diff --
this looks a bit odd; `latencyGauge = this.metrics.gauge(new
LatencyGauge(historySize, isSink), "latency")` would be more consistent to how
other metrics are registered.
> Measure latency of elements and expose it through web interface
> ---------------------------------------------------------------
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: Robert Metzger
> Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not
> even use (8 bytes for each element + System.currentTimeMilis() on each
> element).
> Therefore, I suggest to implement this feature by periodically sending
> special events, similar to watermarks through the topology.
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources
> and forwarded by the tasks. The sinks will compare the timestamp of the
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be
> delayed similarly than regular records, so their latency will approximate the
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync.
> Otherwise, the measured latencies would also include the offsets between the
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as
> a central timing service. The TaskManagers will periodically query the JM for
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it
> would still lead to reasonably good estimations.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)