zentol commented on a change in pull request #16770:
URL: https://github.com/apache/flink/pull/16770#discussion_r687695305
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
##########
@@ -20,20 +20,46 @@
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.IdentityHashMap;
+import java.util.Map;
/**
* A mock {@link RecordEmitter} that works with the {@link MockSplitReader}
and {@link
* MockSourceReader}.
*/
-public class MockRecordEmitter implements RecordEmitter<int[], Integer,
AtomicInteger> {
+public class MockRecordEmitter implements RecordEmitter<int[], Integer,
MockSplitState> {
+ public static final int RECORD_SIZE_IN_BYTES = 10;
+ private final SourceReaderMetricGroup metricGroup;
+ private final Map<MockSplitState, Boolean> knownSplits = new
IdentityHashMap<>();
Review comment:
```suggestion
private final Set<MockSplitState> knownSplits =
Collections.newSetFromMap(new IdentityHashMap<>());
```
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,25 +19,166 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.junit.Rule;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/** IT case for the {@link Source} with a coordinator. */
public class CoordinatedSourceITCase extends AbstractTestBase {
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
Review comment:
```suggestion
@Rule public final SharedObjects sharedObjects = SharedObjects.create();
```
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitState.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+/** The state of the {@link MockSplitReader} reflecting the progress. */
+class MockSplitState {
+ private int index;
+ private final int endIndex;
Review comment:
maybe rename these to *RecordIndex?
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,25 +19,166 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.junit.Rule;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/** IT case for the {@link Source} with a coordinator. */
public class CoordinatedSourceITCase extends AbstractTestBase {
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
+ // since integration tests depend on wall clock time, use huge lags
+ private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
+ private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
+ private static final long EVENTTIME_EPSILON =
Duration.ofDays(20).toMillis();
+ // this basically is the time a build is allowed to be frozen before the
test fails
+ private static final long WATERMARK_EPSILON =
Duration.ofHours(6).toMillis();
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ env.getConfig().setAutoWatermarkInterval(1L);
+
+ int numSplits = 2;
+ int numRecordsPerSplit = 10;
+ MockBaseSource source =
+ new MockBaseSource(numSplits, numRecordsPerSplit,
Boundedness.BOUNDED);
+
+ long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
+ WatermarkStrategy<Integer> strategy =
+ WatermarkStrategy.<Integer>forBoundedOutOfOrderness(
+ Duration.ofMillis(WATERMARK_LAG))
+ .withTimestampAssigner(new
LaggingTimestampAssigner(baseTime));
+
+ // make sure all parallel instances have processed the same amount of
records before
+ // validating metrics
+ SharedReference<CyclicBarrier> beforeBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ SharedReference<CyclicBarrier> afterBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ DataStream<Integer> stream =
+ env.fromSource(source, strategy, "TestingSource")
+ .map(
+ i -> {
+ if (i % numRecordsPerSplit == 3
+ || i % numRecordsPerSplit == 9) {
+ beforeBarrier.get().await();
+ afterBarrier.get().await();
+ }
+ return i;
+ });
+ stream.addSink(new DiscardingSink<>());
+ JobClient jobClient = env.executeAsync();
+
+ beforeBarrier.get().await();
+ assertSourceMetrics(4, numRecordsPerSplit, env.getParallelism());
Review comment:
replace magic number
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,25 +19,166 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.junit.Rule;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/** IT case for the {@link Source} with a coordinator. */
public class CoordinatedSourceITCase extends AbstractTestBase {
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
+ // since integration tests depend on wall clock time, use huge lags
+ private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
+ private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
+ private static final long EVENTTIME_EPSILON =
Duration.ofDays(20).toMillis();
+ // this basically is the time a build is allowed to be frozen before the
test fails
+ private static final long WATERMARK_EPSILON =
Duration.ofHours(6).toMillis();
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
Review comment:
add a comment that this should remain higher than the number of splits
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,25 +19,166 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.junit.Rule;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/** IT case for the {@link Source} with a coordinator. */
public class CoordinatedSourceITCase extends AbstractTestBase {
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
+ // since integration tests depend on wall clock time, use huge lags
+ private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
+ private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
+ private static final long EVENTTIME_EPSILON =
Duration.ofDays(20).toMillis();
+ // this basically is the time a build is allowed to be frozen before the
test fails
+ private static final long WATERMARK_EPSILON =
Duration.ofHours(6).toMillis();
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ env.getConfig().setAutoWatermarkInterval(1L);
+
+ int numSplits = 2;
+ int numRecordsPerSplit = 10;
+ MockBaseSource source =
+ new MockBaseSource(numSplits, numRecordsPerSplit,
Boundedness.BOUNDED);
+
+ long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
+ WatermarkStrategy<Integer> strategy =
+ WatermarkStrategy.<Integer>forBoundedOutOfOrderness(
+ Duration.ofMillis(WATERMARK_LAG))
+ .withTimestampAssigner(new
LaggingTimestampAssigner(baseTime));
+
+ // make sure all parallel instances have processed the same amount of
records before
+ // validating metrics
+ SharedReference<CyclicBarrier> beforeBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ SharedReference<CyclicBarrier> afterBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ DataStream<Integer> stream =
+ env.fromSource(source, strategy, "TestingSource")
+ .map(
+ i -> {
+ if (i % numRecordsPerSplit == 3
+ || i % numRecordsPerSplit == 9) {
+ beforeBarrier.get().await();
+ afterBarrier.get().await();
+ }
+ return i;
+ });
+ stream.addSink(new DiscardingSink<>());
+ JobClient jobClient = env.executeAsync();
+
+ beforeBarrier.get().await();
+ assertSourceMetrics(4, numRecordsPerSplit, env.getParallelism());
+ afterBarrier.get().await();
+
+ beforeBarrier.get().await();
+ assertSourceMetrics(10, numRecordsPerSplit, env.getParallelism());
+ afterBarrier.get().await();
+
+ jobClient.getJobExecutionResult().get();
+ }
+
+ private void assertSourceMetrics(long processedRecords, long numTotal, int
parallelism) {
+ List<OperatorMetricGroup> groups =
+
miniClusterResource.getMetricReporter().findOperatorMetricGroups("TestingSource");
+ assertThat(groups, hasSize(parallelism));
+
+ int subtaskWithMetrics = 0;
+ for (OperatorMetricGroup group : groups) {
+ Map<String, Metric> metrics =
+
miniClusterResource.getMetricReporter().getMetricsByGroup(group);
+ // there are only 2 splits assigned; so two groups will not update
metrics
+ if (group.getIOMetricGroup().getNumRecordsInCounter().getCount()
== 0) {
+ // assert that optional metrics are not initialized when no
split assigned
+
assertThat(metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG), nullValue());
+ assertThat(metrics.get(MetricNames.WATERMARK_LAG),
nullValue());
+ continue;
+ }
+ subtaskWithMetrics++;
+ // I/O metrics
+ assertThat(
+ group.getIOMetricGroup().getNumRecordsInCounter(),
+ isCounter(equalTo(processedRecords)));
+ assertThat(
+ group.getIOMetricGroup().getNumBytesInCounter(),
+ isCounter(equalTo(processedRecords *
MockRecordEmitter.RECORD_SIZE_IN_BYTES)));
+ // MockRecordEmitter is just incrementing errors every even record
+ assertThat(
+ metrics.get(MetricNames.NUM_RECORDS_IN_ERRORS),
+ isCounter(equalTo(processedRecords / 2)));
+ // Timestamp assigner subtracting EVENTTIME_LAG from wall clock,
so expect that lag
+ assertThat(
+ metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG),
+ isGauge(
+ both(greaterThan(EVENTTIME_LAG -
EVENTTIME_EPSILON))
+ .and(lessThan(EVENTTIME_LAG +
EVENTTIME_EPSILON))));
+ // Watermark is derived from timestamp, so it has to be in the
same OOM
Review comment:
OOM?
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,25 +19,166 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.junit.Rule;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/** IT case for the {@link Source} with a coordinator. */
public class CoordinatedSourceITCase extends AbstractTestBase {
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
+ // since integration tests depend on wall clock time, use huge lags
+ private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
+ private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
+ private static final long EVENTTIME_EPSILON =
Duration.ofDays(20).toMillis();
+ // this basically is the time a build is allowed to be frozen before the
test fails
+ private static final long WATERMARK_EPSILON =
Duration.ofHours(6).toMillis();
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ env.getConfig().setAutoWatermarkInterval(1L);
+
+ int numSplits = 2;
+ int numRecordsPerSplit = 10;
+ MockBaseSource source =
+ new MockBaseSource(numSplits, numRecordsPerSplit,
Boundedness.BOUNDED);
+
+ long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
+ WatermarkStrategy<Integer> strategy =
+ WatermarkStrategy.<Integer>forBoundedOutOfOrderness(
+ Duration.ofMillis(WATERMARK_LAG))
+ .withTimestampAssigner(new
LaggingTimestampAssigner(baseTime));
+
+ // make sure all parallel instances have processed the same amount of
records before
+ // validating metrics
+ SharedReference<CyclicBarrier> beforeBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ SharedReference<CyclicBarrier> afterBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ DataStream<Integer> stream =
+ env.fromSource(source, strategy, "TestingSource")
+ .map(
+ i -> {
+ if (i % numRecordsPerSplit == 3
+ || i % numRecordsPerSplit == 9) {
+ beforeBarrier.get().await();
+ afterBarrier.get().await();
+ }
+ return i;
+ });
+ stream.addSink(new DiscardingSink<>());
+ JobClient jobClient = env.executeAsync();
+
+ beforeBarrier.get().await();
+ assertSourceMetrics(4, numRecordsPerSplit, env.getParallelism());
+ afterBarrier.get().await();
+
+ beforeBarrier.get().await();
+ assertSourceMetrics(10, numRecordsPerSplit, env.getParallelism());
+ afterBarrier.get().await();
+
+ jobClient.getJobExecutionResult().get();
+ }
+
+ private void assertSourceMetrics(long processedRecords, long numTotal, int
parallelism) {
Review comment:
```suggestion
private void assertSourceMetrics(long processedRecordsPerSubtask, long
numTotalPerSubtask, int parallelism) {
```
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,25 +19,166 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.junit.Rule;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/** IT case for the {@link Source} with a coordinator. */
public class CoordinatedSourceITCase extends AbstractTestBase {
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
+ // since integration tests depend on wall clock time, use huge lags
+ private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
+ private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
+ private static final long EVENTTIME_EPSILON =
Duration.ofDays(20).toMillis();
+ // this basically is the time a build is allowed to be frozen before the
test fails
+ private static final long WATERMARK_EPSILON =
Duration.ofHours(6).toMillis();
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ env.getConfig().setAutoWatermarkInterval(1L);
+
+ int numSplits = 2;
+ int numRecordsPerSplit = 10;
+ MockBaseSource source =
+ new MockBaseSource(numSplits, numRecordsPerSplit,
Boundedness.BOUNDED);
+
+ long baseTime = System.currentTimeMillis() - EVENTTIME_LAG;
+ WatermarkStrategy<Integer> strategy =
+ WatermarkStrategy.<Integer>forBoundedOutOfOrderness(
+ Duration.ofMillis(WATERMARK_LAG))
+ .withTimestampAssigner(new
LaggingTimestampAssigner(baseTime));
+
+ // make sure all parallel instances have processed the same amount of
records before
+ // validating metrics
+ SharedReference<CyclicBarrier> beforeBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ SharedReference<CyclicBarrier> afterBarrier =
+ sharedObjects.add(new CyclicBarrier(numSplits + 1));
+ DataStream<Integer> stream =
+ env.fromSource(source, strategy, "TestingSource")
+ .map(
+ i -> {
+ if (i % numRecordsPerSplit == 3
+ || i % numRecordsPerSplit == 9) {
+ beforeBarrier.get().await();
+ afterBarrier.get().await();
+ }
+ return i;
+ });
+ stream.addSink(new DiscardingSink<>());
+ JobClient jobClient = env.executeAsync();
+
+ beforeBarrier.get().await();
+ assertSourceMetrics(4, numRecordsPerSplit, env.getParallelism());
+ afterBarrier.get().await();
+
+ beforeBarrier.get().await();
+ assertSourceMetrics(10, numRecordsPerSplit, env.getParallelism());
Review comment:
```suggestion
assertSourceMetrics(numRecordsPerSplit, numRecordsPerSplit,
env.getParallelism());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]