This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d9d68e634bfa3dc0cc8ac127cc130ee0a83df54
Author: Feng Jin <[email protected]>
AuthorDate: Fri Jul 5 15:56:52 2024 +0800

    [FLINK-35754][e2e] Fix SqlGatewayE2ECase.testMaterializedTableInFullMode 
failed due to Internal Server Error
    
    (cherry picked from commit 140d96c00410d16d56f293b321e73475532878c6)
---
 .../flink/table/gateway/SqlGatewayE2ECase.java     | 13 +++++----
 .../flink/core/testutils/CommonTestUtils.java      | 33 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
 
b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
index d65ed2f5f5d..0d7387b5803 100644
--- 
a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
+++ 
b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java
@@ -187,7 +187,7 @@ public class SqlGatewayE2ECase extends TestLogger {
             gatewayRestClient.executeStatementWithResult("SET 
'execution.runtime-mode' = 'batch'");
 
             // verify the result
-            CommonTestUtils.waitUtil(
+            CommonTestUtils.waitUntilIgnoringExceptions(
                     () -> {
                         List<RowData> result =
                                 gatewayRestClient.executeStatementWithResult(
@@ -219,7 +219,7 @@ public class SqlGatewayE2ECase extends TestLogger {
                     "ALTER MATERIALIZED TABLE 
my_materialized_table_in_continuous_mode RESUME");
 
             // verify the result
-            CommonTestUtils.waitUtil(
+            CommonTestUtils.waitUntilIgnoringExceptions(
                     () -> {
                         List<RowData> result =
                                 gatewayRestClient.executeStatementWithResult(
@@ -312,7 +312,7 @@ public class SqlGatewayE2ECase extends TestLogger {
 
             // verify the materialized table should auto refresh the today 
partition or tomorrow
             // partition
-            CommonTestUtils.waitUtil(
+            CommonTestUtils.waitUntilIgnoringExceptions(
                     () -> {
                         List<RowData> result =
                                 gatewayRestClient.executeStatementWithResult(
@@ -345,9 +345,10 @@ public class SqlGatewayE2ECase extends TestLogger {
             gatewayRestClient.executeStatementWithResult(
                     "ALTER MATERIALIZED TABLE 
my_materialized_table_in_full_mode RESUME");
 
-            // wait until the materialized table is updated and verify only 
today or tomorrow data
+            // wait until the materialized table is updated and verify only 
today or tomorrow
+            // data
             // should be updated
-            CommonTestUtils.waitUtil(
+            CommonTestUtils.waitUntilIgnoringExceptions(
                     () -> {
                         List<RowData> result =
                                 gatewayRestClient.executeStatementWithResult(
@@ -378,7 +379,7 @@ public class SqlGatewayE2ECase extends TestLogger {
                             + "')");
 
             // verify the materialized table that all partitions are updated
-            CommonTestUtils.waitUtil(
+            CommonTestUtils.waitUntilIgnoringExceptions(
                     () -> {
                         List<RowData> result =
                                 gatewayRestClient.executeStatementWithResult(
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 7466648a904..70e474e9e86 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -19,6 +19,8 @@
 package org.apache.flink.core.testutils;
 
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
@@ -42,6 +44,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 /** This class contains reusable utility methods for unit tests. */
 public class CommonTestUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CommonTestUtils.class);
+
     /**
      * Creates a copy of an object via Java Serialization.
      *
@@ -215,6 +219,35 @@ public class CommonTestUtils {
         }
     }
 
+    /**
+     * Wait until the given condition is met or timeout, ignoring any 
exceptions thrown by the
+     * condition.
+     *
+     * @param condition the condition to wait for.
+     * @param timeout the maximum time to wait for the condition to become 
true.
+     * @param pause delay between condition checks.
+     * @param errorMsg the error message to include in the 
<code>TimeoutException</code> if the
+     *     condition was not met before timeout.
+     * @throws TimeoutException if the condition is not met before timeout.
+     * @throws InterruptedException if the thread is interrupted.
+     */
+    @SuppressWarnings("BusyWait")
+    public static void waitUntilIgnoringExceptions(
+            Supplier<Boolean> condition, Duration timeout, Duration pause, 
String errorMsg)
+            throws TimeoutException, InterruptedException {
+        Supplier<Boolean> safeCondition =
+                () -> {
+                    try {
+                        return condition.get();
+                    } catch (Exception ignored) {
+                        LOG.warn("Exception thrown while evaluating 
condition", ignored);
+                        return false;
+                    }
+                };
+
+        waitUtil(safeCondition, timeout, pause, errorMsg);
+    }
+
     /**
      * Wait util the given condition is met or timeout.
      *

Reply via email to