This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 976026b  [FLINK-23776][streaming] Fix handling of timestamp-less 
records in Source metrics.
976026b is described below

commit 976026b63175e63009749936416fc9581a0bf56b
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Aug 17 18:14:45 2021 +0200

    [FLINK-23776][streaming] Fix handling of timestamp-less records in Source 
metrics.
---
 .../source/reader/CoordinatedSourceITCase.java     | 80 +++++++++++++++-------
 .../groups/InternalSourceReaderMetricGroup.java    | 21 +++---
 .../streaming/api/operators/SourceOperator.java    |  6 +-
 3 files changed, 69 insertions(+), 38 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index ba0485c..40006f6 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -75,7 +75,23 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
     @Rule public final InMemoryReporterRule inMemoryReporter = 
InMemoryReporterRule.create();
 
     @Test
-    public void testMetrics() throws Exception {
+    public void testMetricsWithTimestamp() throws Exception {
+        long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
+        WatermarkStrategy<Integer> strategy =
+                WatermarkStrategy.forGenerator(
+                                context -> new 
EagerBoundedOutOfOrdernessWatermarks())
+                        .withTimestampAssigner(new 
LaggingTimestampAssigner(baseTime));
+
+        testMetrics(strategy, true);
+    }
+
+    @Test
+    public void testMetricsWithoutTimestamp() throws Exception {
+        testMetrics(WatermarkStrategy.noWatermarks(), false);
+    }
+
+    private void testMetrics(WatermarkStrategy<Integer> strategy, boolean 
hasTimestamps)
+            throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         int numSplits = Math.max(1, env.getParallelism() - 2);
         env.getConfig().setAutoWatermarkInterval(1L);
@@ -84,12 +100,6 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
         MockBaseSource source =
                 new MockBaseSource(numSplits, numRecordsPerSplit, 
Boundedness.BOUNDED);
 
-        long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
-        WatermarkStrategy<Integer> strategy =
-                WatermarkStrategy.forGenerator(
-                                context -> new 
EagerBoundedOutOfOrdernessWatermarks())
-                        .withTimestampAssigner(new 
LaggingTimestampAssigner(baseTime));
-
         // make sure all parallel instances have processed the same amount of 
records before
         // validating metrics
         SharedReference<CyclicBarrier> beforeBarrier =
@@ -113,11 +123,21 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
         JobClient jobClient = env.executeAsync();
 
         beforeBarrier.get().await();
-        assertSourceMetrics(stopAtRecord1 + 1, numRecordsPerSplit, 
env.getParallelism(), numSplits);
+        assertSourceMetrics(
+                stopAtRecord1 + 1,
+                numRecordsPerSplit,
+                env.getParallelism(),
+                numSplits,
+                hasTimestamps);
         afterBarrier.get().await();
 
         beforeBarrier.get().await();
-        assertSourceMetrics(stopAtRecord2 + 1, numRecordsPerSplit, 
env.getParallelism(), numSplits);
+        assertSourceMetrics(
+                stopAtRecord2 + 1,
+                numRecordsPerSplit,
+                env.getParallelism(),
+                numSplits,
+                hasTimestamps);
         afterBarrier.get().await();
 
         jobClient.getJobExecutionResult().get();
@@ -127,7 +147,8 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
             long processedRecordsPerSubtask,
             long numTotalPerSubtask,
             int parallelism,
-            int numSplits) {
+            int numSplits,
+            boolean hasTimestamps) {
         List<OperatorMetricGroup> groups =
                 
inMemoryReporter.getReporter().findOperatorMetricGroups("MetricTestingSource");
         assertThat(groups, hasSize(parallelism));
@@ -157,21 +178,30 @@ public class CoordinatedSourceITCase extends 
AbstractTestBase {
             assertThat(
                     metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS),
                     isCounter(equalTo(processedRecordsPerSubtask / 2)));
-            // Timestamp assigner subtracting EVENTTIME_LAG from wall clock
-            assertThat(
-                    metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG),
-                    isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
-            // Watermark is derived from timestamp, so it has to be in the 
same order of magnitude
-            assertThat(
-                    metrics.get(MetricNames.WATERMARK_LAG),
-                    isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
-            // Calculate the additional watermark lag (on top of event time 
lag)
-            Long watermarkLag =
-                    ((Gauge<Long>) 
metrics.get(MetricNames.WATERMARK_LAG)).getValue()
-                            - ((Gauge<Long>) 
metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG))
-                                    .getValue();
-            // That should correspond to the out-of-order boundedness
-            assertThat(watermarkLag, isCloseTo(WATERMARK_LAG, 
WATERMARK_EPSILON));
+            if (hasTimestamps) {
+                // Timestamp assigner subtracting EVENTTIME_LAG from wall clock
+                assertThat(
+                        metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG),
+                        isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
+                // Watermark is derived from timestamp, so it has to be in the 
same order of
+                // magnitude
+                assertThat(
+                        metrics.get(MetricNames.WATERMARK_LAG),
+                        isGauge(isCloseTo(EVENTTIME_LAG, EVENTTIME_EPSILON)));
+                // Calculate the additional watermark lag (on top of event 
time lag)
+                Long watermarkLag =
+                        ((Gauge<Long>) 
metrics.get(MetricNames.WATERMARK_LAG)).getValue()
+                                - ((Gauge<Long>)
+                                                metrics.get(
+                                                        
MetricNames.CURRENT_EMIT_EVENT_TIME_LAG))
+                                        .getValue();
+                // That should correspond to the out-of-order boundedness
+                assertThat(watermarkLag, isCloseTo(WATERMARK_LAG, 
WATERMARK_EPSILON));
+            } else {
+                // assert that optional metrics are not initialized when no 
timestamp assigned
+                
assertThat(metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG), nullValue());
+                assertThat(metrics.get(MetricNames.WATERMARK_LAG), 
nullValue());
+            }
 
             long pendingRecords = numTotalPerSubtask - 
processedRecordsPerSubtask;
             assertThat(metrics.get(MetricNames.PENDING_RECORDS), 
isGauge(equalTo(pendingRecords)));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
index 256b42f..eaa1cd6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
@@ -37,13 +37,14 @@ public class InternalSourceReaderMetricGroup extends 
ProxyMetricGroup<MetricGrou
         implements SourceReaderMetricGroup {
 
     public static final long ACTIVE = Long.MAX_VALUE;
-    private static final long UNDEFINED = Long.MIN_VALUE;
 
     private final OperatorIOMetricGroup operatorIOMetricGroup;
     private final Clock clock;
     private final Counter numRecordsInErrors;
-    private long lastWatermark = UNDEFINED;
-    private long lastEventTime = UNDEFINED;
+    private boolean watermarkLagRegistered;
+    private boolean eventTimeLagRegistered;
+    private long lastWatermark;
+    private long lastEventTime;
     private long idleStartTime = ACTIVE;
 
     private InternalSourceReaderMetricGroup(
@@ -94,25 +95,23 @@ public class InternalSourceReaderMetricGroup extends 
ProxyMetricGroup<MetricGrou
     }
 
     public void watermarkEmitted(long watermark) {
+        lastWatermark = watermark;
         // iff a respective source emits a watermark, Flink can provide the 
watermark lag
-        if (lastWatermark == UNDEFINED) {
-            lastWatermark = watermark;
+        if (!watermarkLagRegistered) {
             parentMetricGroup.gauge(
                     MetricNames.WATERMARK_LAG, () -> 
clock.absoluteTimeMillis() - lastWatermark);
-        } else {
-            lastWatermark = watermark;
+            watermarkLagRegistered = true;
         }
     }
 
     public void eventTimeEmitted(long timestamp) {
+        lastEventTime = timestamp;
         // iff a respective source emits a timestamp, Flink can provide the 
event lag
-        if (lastEventTime == UNDEFINED) {
-            lastEventTime = timestamp;
+        if (!eventTimeLagRegistered) {
             parentMetricGroup.gauge(
                     MetricNames.CURRENT_EMIT_EVENT_TIME_LAG,
                     () -> getLastEmitTime() - lastEventTime);
-        } else {
-            lastEventTime = timestamp;
+            eventTimeLagRegistered = true;
         }
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 3f87ebd..60f00d3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -461,8 +462,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         public void emitRecord(StreamRecord<OUT> streamRecord) throws 
Exception {
             output.emitRecord(streamRecord);
             this.sourceMetricGroup.recordEmitted();
-            if (streamRecord.hasTimestamp()) {
-                
this.sourceMetricGroup.eventTimeEmitted(streamRecord.getTimestamp());
+            long timestamp = streamRecord.getTimestamp();
+            if (timestamp != TimestampAssigner.NO_TIMESTAMP) {
+                this.sourceMetricGroup.eventTimeEmitted(timestamp);
             }
         }
 

Reply via email to