This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7d9d68e634bfa3dc0cc8ac127cc130ee0a83df54 Author: Feng Jin <[email protected]> AuthorDate: Fri Jul 5 15:56:52 2024 +0800 [FLINK-35754][e2e] Fix SqlGatewayE2ECase.testMaterializedTableInFullMode failed due to Internal Server Error (cherry picked from commit 140d96c00410d16d56f293b321e73475532878c6) --- .../flink/table/gateway/SqlGatewayE2ECase.java | 13 +++++---- .../flink/core/testutils/CommonTestUtils.java | 33 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java index d65ed2f5f5d..0d7387b5803 100644 --- a/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java +++ b/flink-end-to-end-tests/flink-sql-gateway-test/src/test/java/org/apache/flink/table/gateway/SqlGatewayE2ECase.java @@ -187,7 +187,7 @@ public class SqlGatewayE2ECase extends TestLogger { gatewayRestClient.executeStatementWithResult("SET 'execution.runtime-mode' = 'batch'"); // verify the result - CommonTestUtils.waitUtil( + CommonTestUtils.waitUntilIgnoringExceptions( () -> { List<RowData> result = gatewayRestClient.executeStatementWithResult( @@ -219,7 +219,7 @@ public class SqlGatewayE2ECase extends TestLogger { "ALTER MATERIALIZED TABLE my_materialized_table_in_continuous_mode RESUME"); // verify the result - CommonTestUtils.waitUtil( + CommonTestUtils.waitUntilIgnoringExceptions( () -> { List<RowData> result = gatewayRestClient.executeStatementWithResult( @@ -312,7 +312,7 @@ public class SqlGatewayE2ECase extends TestLogger { // verify the materialized table should auto refresh the today partition or tomorrow // partition - CommonTestUtils.waitUtil( + CommonTestUtils.waitUntilIgnoringExceptions( () -> { List<RowData> result = gatewayRestClient.executeStatementWithResult( @@ -345,9 +345,10 @@ public class SqlGatewayE2ECase extends TestLogger { gatewayRestClient.executeStatementWithResult( "ALTER MATERIALIZED TABLE my_materialized_table_in_full_mode RESUME"); - // wait until the materialized table is updated and verify only today or tomorrow data + // wait until the materialized table is updated and verify only today or tomorrow + // data // should be updated - CommonTestUtils.waitUtil( + CommonTestUtils.waitUntilIgnoringExceptions( () -> { List<RowData> result = gatewayRestClient.executeStatementWithResult( @@ -378,7 +379,7 @@ public class SqlGatewayE2ECase extends TestLogger { + "')"); // verify the materialized table that all partitions are updated - CommonTestUtils.waitUtil( + CommonTestUtils.waitUntilIgnoringExceptions( () -> { List<RowData> result = gatewayRestClient.executeStatementWithResult( diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java index 7466648a904..70e474e9e86 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java @@ -19,6 +19,8 @@ package org.apache.flink.core.testutils; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.ByteArrayInputStream; @@ -42,6 +44,8 @@ import static org.hamcrest.MatcherAssert.assertThat; /** This class contains reusable utility methods for unit tests. */ public class CommonTestUtils { + private static final Logger LOG = LoggerFactory.getLogger(CommonTestUtils.class); + /** * Creates a copy of an object via Java Serialization. * @@ -215,6 +219,35 @@ public class CommonTestUtils { } } + /** + * Wait until the given condition is met or timeout, ignoring any exceptions thrown by the + * condition. + * + * @param condition the condition to wait for. + * @param timeout the maximum time to wait for the condition to become true. + * @param pause delay between condition checks. + * @param errorMsg the error message to include in the <code>TimeoutException</code> if the + * condition was not met before timeout. + * @throws TimeoutException if the condition is not met before timeout. + * @throws InterruptedException if the thread is interrupted. + */ + @SuppressWarnings("BusyWait") + public static void waitUntilIgnoringExceptions( + Supplier<Boolean> condition, Duration timeout, Duration pause, String errorMsg) + throws TimeoutException, InterruptedException { + Supplier<Boolean> safeCondition = + () -> { + try { + return condition.get(); + } catch (Exception ignored) { + LOG.warn("Exception thrown while evaluating condition", ignored); + return false; + } + }; + + waitUtil(safeCondition, timeout, pause, errorMsg); + } + /** * Wait util the given condition is met or timeout. *
