fapaul commented on a change in pull request #17601:
URL: https://github.com/apache/flink/pull/17601#discussion_r786811633



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -679,106 +729,165 @@ private void verifyEncoderSubject(
 
     @Test
     public void testSourceTableWithTopicAndTopicPattern() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "Option 'topic' and 'topic-pattern' shouldn't 
be set together.")));
-
-        final Map<String, String> modifiedOptions =
-                getModifiedOptions(
-                        getBasicSourceOptions(),
-                        options -> {
-                            options.put("topic", TOPICS);
-                            options.put("topic-pattern", TOPIC_REGEX);
-                        });
-
-        createTableSource(SCHEMA, modifiedOptions);
+        assertThatThrownBy(
+                        () -> {
+                            final Map<String, String> modifiedOptions =
+                                    getModifiedOptions(
+                                            getBasicSourceOptions(),
+                                            options -> {
+                                                options.put("topic", TOPICS);
+                                                options.put("topic-pattern", 
TOPIC_REGEX);
+                                            });
+
+                            createTableSource(SCHEMA, modifiedOptions);
+                        })
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        throwable ->
+                                assertThatChainOfCauses(throwable)
+                                        .anySatisfy(
+                                                cause ->
+                                                        assertThat(cause)
+                                                                .isInstanceOf(
+                                                                        
ValidationException.class)
+                                                                .hasMessage(
+                                                                        
"Option 'topic' and 'topic-pattern' shouldn't be set together.")));

Review comment:
       For the nested exceptions, you can use Flink's util 
`ExceptionUtils#findThrowable` or `FlinkAssertions.anyCauseMatches()`

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
##########
@@ -74,23 +77,40 @@
     }
 
     public static void waitingExpectedResults(
-            String sinkName, List<String> expected, Duration timeout) throws 
InterruptedException {
-        long now = System.currentTimeMillis();
-        long stop = now + timeout.toMillis();
+            String sinkName, List<String> expected, Duration timeout)
+            throws InterruptedException, TimeoutException {
         Collections.sort(expected);
-        while (System.currentTimeMillis() < stop) {
-            List<String> actual = TestValuesTableFactory.getResults(sinkName);
-            Collections.sort(actual);
-            if (expected.equals(actual)) {
-                return;
-            }
-            Thread.sleep(100);
-        }
+        CommonTestUtils.waitUtil(
+                () -> {
+                    List<String> actual = 
TestValuesTableFactory.getResults(sinkName);
+                    Collections.sort(actual);
+                    return expected.equals(actual);
+                },
+                timeout,
+                "Can not get the expected result.");
+    }
 
-        // timeout, assert again
-        List<String> actual = TestValuesTableFactory.getResults(sinkName);
-        Collections.sort(actual);
-        assertEquals(expected, actual);
+    public static void checkReturnEmptyResults(String sinkName, Duration 
timeout)
+            throws InterruptedException, TimeoutException {
+        CommonTestUtils.waitUtil(
+                new Supplier<Boolean>() {

Review comment:
       I am not the biggest fan of anonymous classes including complex logic, 
can you extract it? WDYT about adding a common helper to `CommonTestUtils` that 
combines `waitUntil` and your count-based logic?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to