AHeise commented on a change in pull request #17050: URL: https://github.com/apache/flink/pull/17050#discussion_r699178562
########## File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java ########## @@ -17,130 +17,109 @@ package org.apache.flink.test.streaming.api; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Preconditions; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.TestLogger; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; -import java.io.PrintWriter; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests that watermarks are emitted while file is being read, particularly the last split. * * @see <a href="https://issues.apache.org/jira/browse/FLINK-19109">FLINK-19109</a> */ -public class FileReadingWatermarkITCase { - private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks"; - private static final String RUNTIME_ACC_NAME = "runtime"; - private static final int FILE_SIZE_LINES = 5_000_000; - private static final int WATERMARK_INTERVAL_MILLIS = 10; - private static final int MIN_EXPECTED_WATERMARKS = 5; +public class FileReadingWatermarkITCase extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(FileReadingWatermarkITCase.class); + + private static final int WATERMARK_INTERVAL_MILLIS = 1_000; + private static final int EXPECTED_WATERMARKS = 5; @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + private SharedReference<CountDownLatch> latch; + + @Before + public void setUp() { + latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS)); + } Review comment: Here is a bigger rewrite of the test that removes a few extra classes/fields and works around deprecations. I can't attach that to the whole file though. Ping me offline if you want to have more detailed feedback. ```suggestion @Rule public final SharedObjects sharedObjects = SharedObjects.create(); /** * Adds an infinite split that causes the input of {@link * org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator} to instantly go * idle while data is still being processed. * * <p>Before FLINK-19109, watermarks would not be emitted at this point. */ @Test public void testWatermarkEmissionWithChaining() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); env.getConfig().setAutoWatermarkInterval(WATERMARK_INTERVAL_MILLIS); SharedReference<CountDownLatch> latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS)); checkState(env.isChainingEnabled()); env.createInput(new InfiniteIntegerInputFormat(true)) .assignTimestampsAndWatermarks( WatermarkStrategy.<Integer>forMonotonousTimestamps() .withTimestampAssigner(context -> getExtractorAssigner())) .addSink(getWatermarkCounter(latch)); env.executeAsync(); latch.get().await(); } private static TimestampAssigner<Integer> getExtractorAssigner() { return new TimestampAssigner<Integer>() { private long counter = 1; @Override public long extractTimestamp(Integer element, long recordTimestamp) { return counter++; } }; } private static SinkFunction<Integer> getWatermarkCounter( final SharedReference<CountDownLatch> latch) { return new RichSinkFunction<Integer>() { @Override public void invoke(Integer value, SinkFunction.Context context) { try { Thread.sleep(1000); LOG.info("Sink received record"); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void writeWatermark(Watermark watermark) { LOG.info("Sink received watermark {}", watermark); latch.get().countDown(); } }; } ``` ########## File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java ########## @@ -17,130 +17,109 @@ package org.apache.flink.test.streaming.api; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Preconditions; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.TestLogger; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; -import java.io.PrintWriter; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkState; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests that watermarks are emitted while file is being read, particularly the last split. * * @see <a href="https://issues.apache.org/jira/browse/FLINK-19109">FLINK-19109</a> */ -public class FileReadingWatermarkITCase { - private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks"; - private static final String RUNTIME_ACC_NAME = "runtime"; - private static final int FILE_SIZE_LINES = 5_000_000; - private static final int WATERMARK_INTERVAL_MILLIS = 10; - private static final int MIN_EXPECTED_WATERMARKS = 5; +public class FileReadingWatermarkITCase extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(FileReadingWatermarkITCase.class); + + private static final int WATERMARK_INTERVAL_MILLIS = 1_000; + private static final int EXPECTED_WATERMARKS = 5; @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + private SharedReference<CountDownLatch> latch; + + @Before + public void setUp() { + latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS)); + } Review comment: When I checked the execution of the test I haven't noticed any difference. According to docs, it is also meant as a replacement for the deprecated `AssignerWith*Watermarks`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org