This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 1619f89  [FLINK-11887][metrics] Fix latency drift
1619f89 is described below

commit 1619f89961c86a25b29e6b4d0b54cb187ed3fd91
Author: SuXingLee <[email protected]>
AuthorDate: Fri Mar 22 18:36:55 2019 +0800

    [FLINK-11887][metrics] Fix latency drift
---
 .../org/apache/flink/streaming/api/operators/StreamSource.java |  2 +-
 .../operators/StreamSourceOperatorLatencyMetricsTest.java      | 10 +++++++++-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 1a70d3a..0a3a8cc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -154,7 +154,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>> extends Abstract
                                        public void onProcessingTime(long 
timestamp) throws Exception {
                                                try {
                                                        // 
ProcessingTimeService callbacks are executed under the checkpointing lock
-                                                       
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, 
subtaskIndex));
+                                                       
output.emitLatencyMarker(new 
LatencyMarker(processingTimeService.getCurrentProcessingTime(), operatorId, 
subtaskIndex));
                                                } catch (Throwable t) {
                                                        // we catch the 
Throwables here so that we don't trigger the processing
                                                        // timer services async 
exception handler
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index fa0e17c..ba4b946 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -170,6 +170,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends 
TestLogger {
                        output.size());
 
                long timestamp = 0L;
+               int expectedLatencyIndex = 0;
 
                int i = 0;
                // verify that its only latency markers + a final watermark
@@ -178,7 +179,14 @@ public class StreamSourceOperatorLatencyMetricsTest 
extends TestLogger {
                        Assert.assertTrue(se.isLatencyMarker());
                        Assert.assertEquals(operator.getOperatorID(), 
se.asLatencyMarker().getOperatorId());
                        Assert.assertEquals(0, 
se.asLatencyMarker().getSubtaskIndex());
-                       Assert.assertTrue(se.asLatencyMarker().getMarkedTime() 
== timestamp);
+
+                       // determines the next latency mark that should've been 
emitted
+                       // latency marks are emitted once per 
latencyMarkInterval,
+                       // as a result of which we never emit both 10 and 11
+                       while (timestamp > 
processingTimes.get(expectedLatencyIndex)) {
+                               expectedLatencyIndex++;
+                       }
+                       
Assert.assertEquals(processingTimes.get(expectedLatencyIndex).longValue(), 
se.asLatencyMarker().getMarkedTime());
 
                        timestamp += latencyMarkInterval;
                }

Reply via email to