This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 1619f89 [FLINK-11887][metrics] Fix latency drift
1619f89 is described below
commit 1619f89961c86a25b29e6b4d0b54cb187ed3fd91
Author: SuXingLee <[email protected]>
AuthorDate: Fri Mar 22 18:36:55 2019 +0800
[FLINK-11887][metrics] Fix latency drift
---
.../org/apache/flink/streaming/api/operators/StreamSource.java | 2 +-
.../operators/StreamSourceOperatorLatencyMetricsTest.java | 10 +++++++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 1a70d3a..0a3a8cc 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -154,7 +154,7 @@ public class StreamSource<OUT, SRC extends
SourceFunction<OUT>> extends Abstract
public void onProcessingTime(long
timestamp) throws Exception {
try {
//
ProcessingTimeService callbacks are executed under the checkpointing lock
-
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId,
subtaskIndex));
+
output.emitLatencyMarker(new
LatencyMarker(processingTimeService.getCurrentProcessingTime(), operatorId,
subtaskIndex));
} catch (Throwable t) {
// we catch the
Throwables here so that we don't trigger the processing
// timer services async
exception handler
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index fa0e17c..ba4b946 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -170,6 +170,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends
TestLogger {
output.size());
long timestamp = 0L;
+ int expectedLatencyIndex = 0;
int i = 0;
// verify that its only latency markers + a final watermark
@@ -178,7 +179,14 @@ public class StreamSourceOperatorLatencyMetricsTest
extends TestLogger {
Assert.assertTrue(se.isLatencyMarker());
Assert.assertEquals(operator.getOperatorID(),
se.asLatencyMarker().getOperatorId());
Assert.assertEquals(0,
se.asLatencyMarker().getSubtaskIndex());
- Assert.assertTrue(se.asLatencyMarker().getMarkedTime()
== timestamp);
+
+ // determines the next latency mark that should've been
emitted
+ // latency marks are emitted once per
latencyMarkInterval,
+ // as a result of which we never emit both 10 and 11
+ while (timestamp >
processingTimes.get(expectedLatencyIndex)) {
+ expectedLatencyIndex++;
+ }
+
Assert.assertEquals(processingTimes.get(expectedLatencyIndex).longValue(),
se.asLatencyMarker().getMarkedTime());
timestamp += latencyMarkInterval;
}