AHeise commented on a change in pull request #17050: URL: https://github.com/apache/flink/pull/17050#discussion_r699242679
########## File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java ########## @@ -17,130 +17,109 @@ package org.apache.flink.test.streaming.api; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Preconditions; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.TestLogger; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; -import java.io.PrintWriter; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests that watermarks are emitted while file is being read, particularly the last split. * * @see <a href="https://issues.apache.org/jira/browse/FLINK-19109">FLINK-19109</a> */ -public class FileReadingWatermarkITCase { - private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks"; - private static final String RUNTIME_ACC_NAME = "runtime"; - private static final int FILE_SIZE_LINES = 5_000_000; - private static final int WATERMARK_INTERVAL_MILLIS = 10; - private static final int MIN_EXPECTED_WATERMARKS = 5; +public class FileReadingWatermarkITCase extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(FileReadingWatermarkITCase.class); + + private static final int WATERMARK_INTERVAL_MILLIS = 1_000; + private static final int EXPECTED_WATERMARKS = 5; @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + private SharedReference<CountDownLatch> latch; + + @Before + public void setUp() { + latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS)); + } Review comment: When I checked the execution of the test I haven't noticed any difference. According to docs, it is also meant as a replacement for the deprecated `AssignerWith*Watermarks`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org