1996fanrui commented on code in PR #23203: URL: https://github.com/apache/flink/pull/23203#discussion_r1294572274
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.failure.FailureEnricherUtils; +import org.apache.flink.util.FlinkException; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link StateWithoutExecutionGraph} state. */ +public class StateWithoutExecutionGraphTest { + + private static final Logger LOG = LoggerFactory.getLogger(CreatedTest.class); + + @Test + void testCancelTransitionsToFinished() throws Exception { + try (MockStateWithoutExecutionGraphContext ctx = + new MockStateWithoutExecutionGraphContext()) { + TestingStateWithoutExecutionGraph state = + new TestingStateWithoutExecutionGraph(ctx, LOG); + + ctx.setExpectFinished( + (archivedExecutionGraph -> + Assertions.assertThat(archivedExecutionGraph.getState()) + .isEqualTo(JobStatus.CANCELED))); + state.cancel(); + } + } + + @Test + void testSuspendTransitionsToFinished() throws Exception { + try (MockStateWithoutExecutionGraphContext ctx = + new MockStateWithoutExecutionGraphContext()) { + TestingStateWithoutExecutionGraph state = + new TestingStateWithoutExecutionGraph(ctx, LOG); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()) + .isEqualTo(JobStatus.SUSPENDED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + }); + + state.suspend(new FlinkException("Job has been suspended.")); + } + } + + @Test + void testTransitionToFinishedOnGlobalFailure() throws Exception { + final String testExceptionString = "This is a test exception"; + try (MockStateWithoutExecutionGraphContext ctx = + new MockStateWithoutExecutionGraphContext()) { + TestingStateWithoutExecutionGraph state = + new TestingStateWithoutExecutionGraph(ctx, LOG); + + ctx.setExpectFinished( + archivedExecutionGraph -> { + assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); + assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull(); + assertThat(archivedExecutionGraph.getFailureInfo().getExceptionAsString()) + .contains(testExceptionString); + }); + + state.handleGlobalFailure( + new RuntimeException(testExceptionString), Review Comment: The `getFailureInfo().getException()` is `SerializedThrowable`, it cannot check the exception instance directly. The `testExceptionString` is enough to check whether the exception is expected, so I prefer leave as it is. WDYT? BTW, I check the `testExceptionString` in the last test as well. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java: ########## @@ -328,19 +260,17 @@ public void testInternalRunScheduledTasks_correctExecutionOrder() { ctx.runScheduledTasks(); - assertThat(thirdRun.get(), is(true)); + assertThat(thirdRun).isTrue(); } @Test - public void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() { + void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() { MockContext ctx = new MockContext(); AtomicBoolean executed = new AtomicBoolean(false); Runnable executeOnce = () -> { - if (executed.get()) { - fail("Multiple executions"); - } + assertThat(executed).as("Multiple executions").isFalse(); Review Comment: > That's a bit out-of-scope for this PR/hotfix commit Do you mean we shouldn't call `assertThat(executed).as("Multiple executions").isFalse();` here? The `fail` is similar to the old check, so I updated it to `assertThat`. > I'm wondering whether we should also add a assertThat(executed).isTrue(); to the end of this test to ensure that the task was actually executed. Good catch, I think we should add it. Because the runnable is `executeOnce`. We should ensureit is called, and only once. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; + +import org.jetbrains.annotations.Nullable; + +import java.util.function.Consumer; + +/** Mock the {@link StateWithoutExecutionGraph.Context}. */ +class MockStateWithoutExecutionGraphContext + implements StateWithoutExecutionGraph.Context, AutoCloseable { + + private final StateValidator<ArchivedExecutionGraph> finishedStateValidator = + new StateValidator<>("Finished"); + + protected boolean hadStateTransition = false; Review Comment: It helps with field security, updated. ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; + +import org.jetbrains.annotations.Nullable; + +import java.util.function.Consumer; + +/** Mock the {@link StateWithoutExecutionGraph.Context}. */ +class MockStateWithoutExecutionGraphContext + implements StateWithoutExecutionGraph.Context, AutoCloseable { Review Comment: Sounds good to me. Using the `AfterEachCallback` to instead of `AutoCloseable`, and adding the `@RegisterExtension` for each test classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
