AHeise commented on a change in pull request #16770:
URL: https://github.com/apache/flink/pull/16770#discussion_r687868903



##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,30 +19,175 @@
 package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 
-import org.hamcrest.Matchers;
+import org.junit.Rule;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.concurrent.CyclicBarrier;
 
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /** IT case for the {@link Source} with a coordinator. */
 public class CoordinatedSourceITCase extends AbstractTestBase {
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    // since integration tests depend on wall clock time, use huge lags
+    private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
+    private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
+    private static final long EVENTTIME_EPSILON = 
Duration.ofDays(20).toMillis();
+    // this basically is the time a build is allowed to be frozen before the 
test fails
+    private static final long WATERMARK_EPSILON = 
Duration.ofHours(6).toMillis();
+
+    @Test
+    public void testMetrics() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        int numSplits = 2;
+        env.setParallelism(numSplits + 2);
+        env.getConfig().setAutoWatermarkInterval(1L);
+
+        int numRecordsPerSplit = 10;
+        MockBaseSource source =
+                new MockBaseSource(numSplits, numRecordsPerSplit, 
Boundedness.BOUNDED);
+
+        long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
+        WatermarkStrategy<Integer> strategy =
+                WatermarkStrategy.<Integer>forBoundedOutOfOrderness(
+                                Duration.ofMillis(WATERMARK_LAG))
+                        .withTimestampAssigner(new 
LaggingTimestampAssigner(baseTime));
+
+        // make sure all parallel instances have processed the same amount of 
records before
+        // validating metrics
+        SharedReference<CyclicBarrier> beforeBarrier =
+                sharedObjects.add(new CyclicBarrier(numSplits + 1));
+        SharedReference<CyclicBarrier> afterBarrier =
+                sharedObjects.add(new CyclicBarrier(numSplits + 1));
+        int stopAtRecord1 = 3;
+        int stopAtRecord2 = numRecordsPerSplit - 1;
+        DataStream<Integer> stream =
+                env.fromSource(source, strategy, "TestingSource")
+                        .map(
+                                i -> {
+                                    if (i % numRecordsPerSplit == stopAtRecord1
+                                            || i % numRecordsPerSplit == 
stopAtRecord2) {
+                                        beforeBarrier.get().await();
+                                        afterBarrier.get().await();
+                                    }
+                                    return i;
+                                });
+        stream.addSink(new DiscardingSink<>());
+        JobClient jobClient = env.executeAsync();
+
+        beforeBarrier.get().await();
+        assertSourceMetrics(stopAtRecord1 + 1, numRecordsPerSplit, 
env.getParallelism(), numSplits);
+        afterBarrier.get().await();
+
+        beforeBarrier.get().await();
+        assertSourceMetrics(stopAtRecord2 + 1, numRecordsPerSplit, 
env.getParallelism(), numSplits);
+        afterBarrier.get().await();
+
+        jobClient.getJobExecutionResult().get();
+    }
+
+    private void assertSourceMetrics(
+            long processedRecordsPerSubtask,
+            long numTotalPerSubtask,
+            int parallelism,
+            int numSplits) {
+        List<OperatorMetricGroup> groups =
+                
miniClusterResource.getMetricReporter().findOperatorMetricGroups("TestingSource");
+        assertThat(groups, hasSize(parallelism));
+
+        int subtaskWithMetrics = 0;
+        for (OperatorMetricGroup group : groups) {
+            Map<String, Metric> metrics =
+                    
miniClusterResource.getMetricReporter().getMetricsByGroup(group);
+            // there are only 2 splits assigned; so two groups will not update 
metrics
+            if (group.getIOMetricGroup().getNumRecordsInCounter().getCount() 
== 0) {
+                // assert that optional metrics are not initialized when no 
split assigned
+                
assertThat(metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG), nullValue());
+                assertThat(metrics.get(MetricNames.WATERMARK_LAG), 
nullValue());
+                continue;
+            }
+            subtaskWithMetrics++;
+            // I/O metrics
+            assertThat(
+                    group.getIOMetricGroup().getNumRecordsInCounter(),
+                    isCounter(equalTo(processedRecordsPerSubtask)));
+            assertThat(
+                    group.getIOMetricGroup().getNumBytesInCounter(),
+                    isCounter(
+                            equalTo(
+                                    processedRecordsPerSubtask
+                                            * 
MockRecordEmitter.RECORD_SIZE_IN_BYTES)));
+            // MockRecordEmitter is just incrementing errors every even record
+            assertThat(
+                    metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS),
+                    isCounter(equalTo(processedRecordsPerSubtask / 2)));
+            // Timestamp assigner subtracting EVENTTIME_LAG from wall clock, 
so expect that lag

Review comment:
       Probably just bad english; I'll try to fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to