This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94d861b08fef1e350d80a3f5f0f63168d327bc64
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Tue May 14 11:18:40 2024 +0800

    [FLINK-35342][table] Fix MaterializedTableStatementITCase test can check 
for wrong status
---
 .../service/MaterializedTableStatementITCase.java    | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 105c51ea597..dd7d25e124c 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -272,7 +272,7 @@ public class MaterializedTableStatementITCase {
         waitUntilAllTasksAreRunning(
                 restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
 
-        // check the background job is running
+        // verify the background job is running
         String describeJobDDL = String.format("DESCRIBE JOB '%s'", 
activeRefreshHandler.getJobId());
         OperationHandle describeJobHandle =
                 service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
@@ -653,7 +653,7 @@ public class MaterializedTableStatementITCase {
         assertThat(suspendMaterializedTable.getRefreshStatus())
                 .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
 
-        // check background job is stopped
+        // verify background job is stopped
         byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
         ContinuousRefreshHandler suspendRefreshHandler =
                 ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
@@ -667,7 +667,7 @@ public class MaterializedTableStatementITCase {
         List<RowData> jobResults = fetchAllResults(service, sessionHandle, 
describeJobHandle);
         
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("FINISHED");
 
-        // check savepoint is created
+        // verify savepoint is created
         assertThat(suspendRefreshHandler.getRestorePath()).isNotEmpty();
         String actualSavepointPath = 
suspendRefreshHandler.getRestorePath().get();
 
@@ -692,7 +692,17 @@ public class MaterializedTableStatementITCase {
         assertThat(resumedCatalogMaterializedTable.getRefreshStatus())
                 .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
 
-        // check background job is running
+        waitUntilAllTasksAreRunning(
+                restClusterClient,
+                JobID.fromHexString(
+                        ContinuousRefreshHandlerSerializer.INSTANCE
+                                .deserialize(
+                                        resumedCatalogMaterializedTable
+                                                .getSerializedRefreshHandler(),
+                                        getClass().getClassLoader())
+                                .getJobId()));
+
+        // verify background job is running
         refreshHandler = 
resumedCatalogMaterializedTable.getSerializedRefreshHandler();
         ContinuousRefreshHandler resumeRefreshHandler =
                 ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
@@ -706,7 +716,7 @@ public class MaterializedTableStatementITCase {
         jobResults = fetchAllResults(service, sessionHandle, 
describeResumeJobHandle);
         
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
 
-        // check resumed job is restored from savepoint
+        // verify resumed job is restored from savepoint
         Optional<String> actualRestorePath =
                 getJobRestoreSavepointPath(restClusterClient, resumeJobId);
         assertThat(actualRestorePath).isNotEmpty();

Reply via email to