This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 976026b [FLINK-23776][streaming] Fix handling of timestamp-less
records in Source metrics.
976026b is described below
commit 976026b63175e63009749936416fc9581a0bf56b
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Aug 17 18:14:45 2021 +0200
[FLINK-23776][streaming] Fix handling of timestamp-less records in Source
metrics.
---
.../source/reader/CoordinatedSourceITCase.java | 80 +++++++++++++++-------
.../groups/InternalSourceReaderMetricGroup.java | 21 +++---
.../streaming/api/operators/SourceOperator.java | 6 +-
3 files changed, 69 insertions(+), 38 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index ba0485c..40006f6 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -75,7 +75,23 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
@Rule public final InMemoryReporterRule inMemoryReporter =
InMemoryReporterRule.create();
@Test
- public void testMetrics() throws Exception {
+ public void testMetricsWithTimestamp() throws Exception {
+ long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
+ WatermarkStrategy<Integer> strategy =
+ WatermarkStrategy.forGenerator(
+ context -> new
EagerBoundedOutOfOrdernessWatermarks())
+ .withTimestampAssigner(new
LaggingTimestampAssigner(baseTime));
+
+ testMetrics(strategy, true);
+ }
+
+ @Test
+ public void testMetricsWithoutTimestamp() throws Exception {
+ testMetrics(WatermarkStrategy.noWatermarks(), false);
+ }
+
+ private void testMetrics(WatermarkStrategy<Integer> strategy, boolean
hasTimestamps)
+ throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
int numSplits = Math.max(1, env.getParallelism() - 2);
env.getConfig().setAutoWatermarkInterval(1L);
@@ -84,12 +100,6 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
MockBaseSource source =
new MockBaseSource(numSplits, numRecordsPerSplit,
Boundedness.BOUNDED);
- long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
- WatermarkStrategy<Integer> strategy =
- WatermarkStrategy.forGenerator(
- context -> new
EagerBoundedOutOfOrdernessWatermarks())
- .withTimestampAssigner(new
LaggingTimestampAssigner(baseTime));
-
// make sure all parallel instances have processed the same amount of
records before
// validating metrics
SharedReference<CyclicBarrier> beforeBarrier =
@@ -113,11 +123,21 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
JobClient jobClient = env.executeAsync();
beforeBarrier.get().await();
- assertSourceMetrics(stopAtRecord1 + 1, numRecordsPerSplit,
env.getParallelism(), numSplits);
+ assertSourceMetrics(
+ stopAtRecord1 + 1,
+ numRecordsPerSplit,
+ env.getParallelism(),
+ numSplits,
+ hasTimestamps);
afterBarrier.get().await();
beforeBarrier.get().await();
- assertSourceMetrics(stopAtRecord2 + 1, numRecordsPerSplit,
env.getParallelism(), numSplits);
+ assertSourceMetrics(
+ stopAtRecord2 + 1,
+ numRecordsPerSplit,
+ env.getParallelism(),
+ numSplits,
+ hasTimestamps);
afterBarrier.get().await();
jobClient.getJobExecutionResult().get();
@@ -127,7 +147,8 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
long processedRecordsPerSubtask,
long numTotalPerSubtask,
int parallelism,
- int numSplits) {
+ int numSplits,
+ boolean hasTimestamps) {
List<OperatorMetricGroup> groups =
inMemoryReporter.getReporter().findOperatorMetricGroups("MetricTestingSource");
assertThat(groups, hasSize(parallelism));
@@ -157,21 +178,30 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
assertThat(
metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS),
isCounter(equalTo(processedRecordsPerSubtask / 2)));
- // Timestamp assigner subtracting EVENTTIME_LAG from wall clock
- assertThat(
- metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG),
- isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
- // Watermark is derived from timestamp, so it has to be in the
same order of magnitude
- assertThat(
- metrics.get(MetricNames.WATERMARK_LAG),
- isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
- // Calculate the additional watermark lag (on top of event time
lag)
- Long watermarkLag =
- ((Gauge<Long>)
metrics.get(MetricNames.WATERMARK_LAG)).getValue()
- - ((Gauge<Long>)
metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG))
- .getValue();
- // That should correspond to the out-of-order boundedness
- assertThat(watermarkLag, isCloseTo(WATERMARK_LAG,
WATERMARK_EPSILON));
+ if (hasTimestamps) {
+ // Timestamp assigner subtracting EVENTTIME_LAG from wall clock
+ assertThat(
+ metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG),
+ isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
+ // Watermark is derived from timestamp, so it has to be in the
same order of
+ // magnitude
+ assertThat(
+ metrics.get(MetricNames.WATERMARK_LAG),
+ isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
+ // Calculate the additional watermark lag (on top of event
time lag)
+ Long watermarkLag =
+ ((Gauge<Long>)
metrics.get(MetricNames.WATERMARK_LAG)).getValue()
+ - ((Gauge<Long>)
+ metrics.get(
+
MetricNames.CURRENT_EMIT_EVENT_TIME_LAG))
+ .getValue();
+ // That should correspond to the out-of-order boundedness
+ assertThat(watermarkLag, isCloseTo(WATERMARK_LAG,
WATERMARK_EPSILON));
+ } else {
+ // assert that optional metrics are not initialized when no
timestamp assigned
+
assertThat(metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG), nullValue());
+ assertThat(metrics.get(MetricNames.WATERMARK_LAG),
nullValue());
+ }
long pendingRecords = numTotalPerSubtask -
processedRecordsPerSubtask;
assertThat(metrics.get(MetricNames.PENDING_RECORDS),
isGauge(equalTo(pendingRecords)));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
index 256b42f..eaa1cd6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
@@ -37,13 +37,14 @@ public class InternalSourceReaderMetricGroup extends
ProxyMetricGroup<MetricGrou
implements SourceReaderMetricGroup {
public static final long ACTIVE = Long.MAX_VALUE;
- private static final long UNDEFINED = Long.MIN_VALUE;
private final OperatorIOMetricGroup operatorIOMetricGroup;
private final Clock clock;
private final Counter numRecordsInErrors;
- private long lastWatermark = UNDEFINED;
- private long lastEventTime = UNDEFINED;
+ private boolean watermarkLagRegistered;
+ private boolean eventTimeLagRegistered;
+ private long lastWatermark;
+ private long lastEventTime;
private long idleStartTime = ACTIVE;
private InternalSourceReaderMetricGroup(
@@ -94,25 +95,23 @@ public class InternalSourceReaderMetricGroup extends
ProxyMetricGroup<MetricGrou
}
public void watermarkEmitted(long watermark) {
+ lastWatermark = watermark;
// iff a respective source emits a watermark, Flink can provide the
watermark lag
- if (lastWatermark == UNDEFINED) {
- lastWatermark = watermark;
+ if (!watermarkLagRegistered) {
parentMetricGroup.gauge(
MetricNames.WATERMARK_LAG, () ->
clock.absoluteTimeMillis() - lastWatermark);
- } else {
- lastWatermark = watermark;
+ watermarkLagRegistered = true;
}
}
public void eventTimeEmitted(long timestamp) {
+ lastEventTime = timestamp;
// iff a respective source emits a timestamp, Flink can provide the
event lag
- if (lastEventTime == UNDEFINED) {
- lastEventTime = timestamp;
+ if (!eventTimeLagRegistered) {
parentMetricGroup.gauge(
MetricNames.CURRENT_EMIT_EVENT_TIME_LAG,
() -> getLastEmitTime() - lastEventTime);
- } else {
- lastEventTime = timestamp;
+ eventTimeLagRegistered = true;
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 3f87ebd..60f00d3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -461,8 +462,9 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
public void emitRecord(StreamRecord<OUT> streamRecord) throws
Exception {
output.emitRecord(streamRecord);
this.sourceMetricGroup.recordEmitted();
- if (streamRecord.hasTimestamp()) {
-
this.sourceMetricGroup.eventTimeEmitted(streamRecord.getTimestamp());
+ long timestamp = streamRecord.getTimestamp();
+ if (timestamp != TimestampAssigner.NO_TIMESTAMP) {
+ this.sourceMetricGroup.eventTimeEmitted(timestamp);
}
}