This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push:
new c655c3b [FLINK-11887][metrics] Fix latency drift
c655c3b is described below
commit c655c3b275c927277799dc3ec52dcc7637d6c132
Author: SuXingLee <[email protected]>
AuthorDate: Fri Mar 22 18:36:55 2019 +0800
[FLINK-11887][metrics] Fix latency drift
---
.../org/apache/flink/streaming/api/operators/StreamSource.java | 2 +-
.../operators/StreamSourceOperatorLatencyMetricsTest.java | 10 +++++++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 63dd3e4..f09d83e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -155,7 +155,7 @@ public class StreamSource<OUT, SRC extends
SourceFunction<OUT>>
public void onProcessingTime(long
timestamp) throws Exception {
try {
//
ProcessingTimeService callbacks are executed under the checkpointing lock
-
output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId,
subtaskIndex));
+
output.emitLatencyMarker(new
LatencyMarker(processingTimeService.getCurrentProcessingTime(), operatorId,
subtaskIndex));
} catch (Throwable t) {
// we catch the
Throwables here so that we don't trigger the processing
// timer services async
exception handler
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index 14d5147..dbcfc0d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -175,6 +175,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends
TestLogger {
output.size());
long timestamp = 0L;
+ int expectedLatencyIndex = 0;
int i = 0;
// verify that its only latency markers + a final watermark
@@ -183,7 +184,14 @@ public class StreamSourceOperatorLatencyMetricsTest
extends TestLogger {
Assert.assertTrue(se.isLatencyMarker());
Assert.assertEquals(operator.getOperatorID(),
se.asLatencyMarker().getOperatorId());
Assert.assertEquals(0,
se.asLatencyMarker().getSubtaskIndex());
- Assert.assertTrue(se.asLatencyMarker().getMarkedTime()
== timestamp);
+
+ // determines the next latency mark that should've been
emitted
+ // latency marks are emitted once per
latencyMarkInterval,
+ // as a result of which we never emit both 10 and 11
+ while (timestamp >
processingTimes.get(expectedLatencyIndex)) {
+ expectedLatencyIndex++;
+ }
+
Assert.assertEquals(processingTimes.get(expectedLatencyIndex).longValue(),
se.asLatencyMarker().getMarkedTime());
timestamp += latencyMarkInterval;
}