ruanhang1993 commented on a change in pull request #17601:
URL: https://github.com/apache/flink/pull/17601#discussion_r787369372



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
##########
@@ -74,23 +77,40 @@
     }
 
     public static void waitingExpectedResults(
-            String sinkName, List<String> expected, Duration timeout) throws 
InterruptedException {
-        long now = System.currentTimeMillis();
-        long stop = now + timeout.toMillis();
+            String sinkName, List<String> expected, Duration timeout)
+            throws InterruptedException, TimeoutException {
         Collections.sort(expected);
-        while (System.currentTimeMillis() < stop) {
-            List<String> actual = TestValuesTableFactory.getResults(sinkName);
-            Collections.sort(actual);
-            if (expected.equals(actual)) {
-                return;
-            }
-            Thread.sleep(100);
-        }
+        CommonTestUtils.waitUtil(
+                () -> {
+                    List<String> actual = 
TestValuesTableFactory.getResults(sinkName);
+                    Collections.sort(actual);
+                    return expected.equals(actual);
+                },
+                timeout,
+                "Can not get the expected result.");
+    }
 
-        // timeout, assert again
-        List<String> actual = TestValuesTableFactory.getResults(sinkName);
-        Collections.sort(actual);
-        assertEquals(expected, actual);
+    public static void checkReturnEmptyResults(String sinkName, Duration 
timeout)
+            throws InterruptedException, TimeoutException {
+        CommonTestUtils.waitUtil(
+                new Supplier<Boolean>() {

Review comment:
       Yes, I do not think waiting and checking the empty result is a good way 
to test this situation. I have changed the test to write some data after the 
job started, and check the new data to make sure we read from latest offsets.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to