This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 94d861b08fef1e350d80a3f5f0f63168d327bc64 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Tue May 14 11:18:40 2024 +0800 [FLINK-35342][table] Fix MaterializedTableStatementITCase test can check for wrong status --- .../service/MaterializedTableStatementITCase.java | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 105c51ea597..dd7d25e124c 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -272,7 +272,7 @@ public class MaterializedTableStatementITCase { waitUntilAllTasksAreRunning( restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); - // check the background job is running + // verify the background job is running String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId()); OperationHandle describeJobHandle = service.executeStatement(sessionHandle, describeJobDDL, -1, new Configuration()); @@ -653,7 +653,7 @@ public class MaterializedTableStatementITCase { assertThat(suspendMaterializedTable.getRefreshStatus()) .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED); - // check background job is stopped + // verify background job is stopped byte[] refreshHandler = suspendMaterializedTable.getSerializedRefreshHandler(); ContinuousRefreshHandler suspendRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( @@ -667,7 +667,7 @@ public class MaterializedTableStatementITCase { List<RowData> jobResults = fetchAllResults(service, sessionHandle, describeJobHandle); assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("FINISHED"); - // check savepoint is created + // verify savepoint is created assertThat(suspendRefreshHandler.getRestorePath()).isNotEmpty(); String actualSavepointPath = suspendRefreshHandler.getRestorePath().get(); @@ -692,7 +692,17 @@ public class MaterializedTableStatementITCase { assertThat(resumedCatalogMaterializedTable.getRefreshStatus()) .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); - // check background job is running + waitUntilAllTasksAreRunning( + restClusterClient, + JobID.fromHexString( + ContinuousRefreshHandlerSerializer.INSTANCE + .deserialize( + resumedCatalogMaterializedTable + .getSerializedRefreshHandler(), + getClass().getClassLoader()) + .getJobId())); + + // verify background job is running refreshHandler = resumedCatalogMaterializedTable.getSerializedRefreshHandler(); ContinuousRefreshHandler resumeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( @@ -706,7 +716,7 @@ public class MaterializedTableStatementITCase { jobResults = fetchAllResults(service, sessionHandle, describeResumeJobHandle); assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); - // check resumed job is restored from savepoint + // verify resumed job is restored from savepoint Optional<String> actualRestorePath = getJobRestoreSavepointPath(restClusterClient, resumeJobId); assertThat(actualRestorePath).isNotEmpty();