fapaul commented on a change in pull request #17050:
URL: https://github.com/apache/flink/pull/17050#discussion_r699229651



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java
##########
@@ -17,130 +17,109 @@
 
 package org.apache.flink.test.streaming.api;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests that watermarks are emitted while file is being read, particularly 
the last split.
  *
  * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-19109";>FLINK-19109</a>
  */
-public class FileReadingWatermarkITCase {
-    private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks";
-    private static final String RUNTIME_ACC_NAME = "runtime";
-    private static final int FILE_SIZE_LINES = 5_000_000;
-    private static final int WATERMARK_INTERVAL_MILLIS = 10;
-    private static final int MIN_EXPECTED_WATERMARKS = 5;
+public class FileReadingWatermarkITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileReadingWatermarkITCase.class);
+
+    private static final int WATERMARK_INTERVAL_MILLIS = 1_000;
+    private static final int EXPECTED_WATERMARKS = 5;
 
     @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+    private SharedReference<CountDownLatch> latch;
+
+    @Before
+    public void setUp() {
+        latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS));
+    }

Review comment:
       Thanks for the suggestion it looks good to me only one thing I was not 
sure about is if the changed `WatermarkStrategy` maybe generates watermarks 
differently and it cannot run in the original issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to