AHeise commented on a change in pull request #16770:
URL: https://github.com/apache/flink/pull/16770#discussion_r687731923
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##########
@@ -19,25 +19,166 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.connector.base.source.reader.mocks.MockRecordEmitter;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.junit.Rule;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isCounter;
+import static org.apache.flink.metrics.testutils.MetricMatchers.isGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/** IT case for the {@link Source} with a coordinator. */
public class CoordinatedSourceITCase extends AbstractTestBase {
+ @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
+ // since integration tests depend on wall clock time, use huge lags
+ private static final long EVENTTIME_LAG = Duration.ofDays(100).toMillis();
+ private static final long WATERMARK_LAG = Duration.ofDays(1).toMillis();
+ private static final long EVENTTIME_EPSILON =
Duration.ofDays(20).toMillis();
+ // this basically is the time a build is allowed to be frozen before the
test fails
+ private static final long WATERMARK_EPSILON =
Duration.ofHours(6).toMillis();
+
+ @Test
+ public void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
Review comment:
I solved it by setting it to `numSplits + 2`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]