XComp commented on code in PR #23203:
URL: https://github.com/apache/flink/pull/23203#discussion_r1294385242


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java:
##########
@@ -328,19 +260,17 @@ public void 
testInternalRunScheduledTasks_correctExecutionOrder() {
 
         ctx.runScheduledTasks();
 
-        assertThat(thirdRun.get(), is(true));
+        assertThat(thirdRun).isTrue();
     }
 
     @Test
-    public void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
+    void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
         MockContext ctx = new MockContext();
 
         AtomicBoolean executed = new AtomicBoolean(false);
         Runnable executeOnce =
                 () -> {
-                    if (executed.get()) {
-                        fail("Multiple executions");
-                    }
+                    assertThat(executed).as("Multiple executions").isFalse();

Review Comment:
   That's a bit out-of-scope for this PR/hotfix commit, but: I'm wondering 
whether we should also add a `assertThat(executed).isTrue();` to the end of 
this test to ensure that the task was actually executed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link StateWithoutExecutionGraph} state. */
+public class StateWithoutExecutionGraphTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreatedTest.class);
+
+    @Test
+    void testCancelTransitionsToFinished() throws Exception {
+        try (MockStateWithoutExecutionGraphContext ctx =
+                new MockStateWithoutExecutionGraphContext()) {
+            TestingStateWithoutExecutionGraph state =
+                    new TestingStateWithoutExecutionGraph(ctx, LOG);
+
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph ->
+                            
Assertions.assertThat(archivedExecutionGraph.getState())
+                                    .isEqualTo(JobStatus.CANCELED)));

Review Comment:
   Unnecessary brackets



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link StateWithoutExecutionGraph} state. */
+public class StateWithoutExecutionGraphTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreatedTest.class);
+
+    @Test
+    void testCancelTransitionsToFinished() throws Exception {
+        try (MockStateWithoutExecutionGraphContext ctx =
+                new MockStateWithoutExecutionGraphContext()) {
+            TestingStateWithoutExecutionGraph state =
+                    new TestingStateWithoutExecutionGraph(ctx, LOG);
+
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph ->
+                            
Assertions.assertThat(archivedExecutionGraph.getState())

Review Comment:
   can we add a `isNull` check to verify `getFailureInfo` here as well?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link StateWithoutExecutionGraph} state. */
+public class StateWithoutExecutionGraphTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreatedTest.class);
+
+    @Test
+    void testCancelTransitionsToFinished() throws Exception {
+        try (MockStateWithoutExecutionGraphContext ctx =
+                new MockStateWithoutExecutionGraphContext()) {
+            TestingStateWithoutExecutionGraph state =
+                    new TestingStateWithoutExecutionGraph(ctx, LOG);
+
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph ->
+                            
Assertions.assertThat(archivedExecutionGraph.getState())
+                                    .isEqualTo(JobStatus.CANCELED)));
+            state.cancel();
+        }
+    }
+
+    @Test
+    void testSuspendTransitionsToFinished() throws Exception {
+        try (MockStateWithoutExecutionGraphContext ctx =
+                new MockStateWithoutExecutionGraphContext()) {
+            TestingStateWithoutExecutionGraph state =
+                    new TestingStateWithoutExecutionGraph(ctx, LOG);
+
+            ctx.setExpectFinished(
+                    archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState())
+                                .isEqualTo(JobStatus.SUSPENDED);
+                        
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    });
+
+            state.suspend(new FlinkException("Job has been suspended."));

Review Comment:
   We can move the `FlinkException` into a local variable and verify the 
`getFailureInfo()` in more detail.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraphTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link StateWithoutExecutionGraph} state. */
+public class StateWithoutExecutionGraphTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreatedTest.class);
+
+    @Test
+    void testCancelTransitionsToFinished() throws Exception {
+        try (MockStateWithoutExecutionGraphContext ctx =
+                new MockStateWithoutExecutionGraphContext()) {
+            TestingStateWithoutExecutionGraph state =
+                    new TestingStateWithoutExecutionGraph(ctx, LOG);
+
+            ctx.setExpectFinished(
+                    (archivedExecutionGraph ->
+                            
Assertions.assertThat(archivedExecutionGraph.getState())
+                                    .isEqualTo(JobStatus.CANCELED)));
+            state.cancel();
+        }
+    }
+
+    @Test
+    void testSuspendTransitionsToFinished() throws Exception {
+        try (MockStateWithoutExecutionGraphContext ctx =
+                new MockStateWithoutExecutionGraphContext()) {
+            TestingStateWithoutExecutionGraph state =
+                    new TestingStateWithoutExecutionGraph(ctx, LOG);
+
+            ctx.setExpectFinished(
+                    archivedExecutionGraph -> {
+                        assertThat(archivedExecutionGraph.getState())
+                                .isEqualTo(JobStatus.SUSPENDED);
+                        
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                    });
+
+            state.suspend(new FlinkException("Job has been suspended."));
+        }
+    }
+
+    @Test
+    void testTransitionToFinishedOnGlobalFailure() throws Exception {
+        final String testExceptionString = "This is a test exception";
+        try (MockStateWithoutExecutionGraphContext ctx =
+                new MockStateWithoutExecutionGraphContext()) {
+            TestingStateWithoutExecutionGraph state =
+                    new TestingStateWithoutExecutionGraph(ctx, LOG);
+
+            ctx.setExpectFinished(
+                    archivedExecutionGraph -> {
+                        
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
+                        
assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull();
+                        
assertThat(archivedExecutionGraph.getFailureInfo().getExceptionAsString())
+                                .contains(testExceptionString);
+                    });
+
+            state.handleGlobalFailure(
+                    new RuntimeException(testExceptionString),

Review Comment:
   We could make this test a bit more explicit by providing the actual 
exception instance in a local variable. We can assert on 
`getFailureInfo().getException()` then instead of `getExceptionAsString()`. 
WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.function.Consumer;
+
+/** Mock the {@link StateWithoutExecutionGraph.Context}. */
+class MockStateWithoutExecutionGraphContext
+        implements StateWithoutExecutionGraph.Context, AutoCloseable {
+
+    private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
+            new StateValidator<>("Finished");
+
+    protected boolean hadStateTransition = false;

Review Comment:
   I'm not a big fan of protected fields. The extending classes access the 
field now but the reader might initially struggle to grasp where the field is 
coming from. Instead, could we make this `private` and add a method 
`registerStateTransition()`? The extending classes could then communicate 
solely based on protected methods.  WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java:
##########
@@ -468,7 +390,7 @@ public ScheduledFuture<?> runIfState(State expectedState, 
Runnable action, Durat
             final ScheduledTask<Void> scheduledTask =
                     new ScheduledTask<>(
                             () -> {
-                                if (!hasStateTransition) {
+                                if (!hadStateTransition) {

Review Comment:
   What about using `hadStateTransition()` instead?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithoutExecutionGraph.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstract state class which contains an {@link Context} and {@link #logger} 
to execute common

Review Comment:
   ```suggestion
    * Abstract state class which contains its {@link Context} and {@link 
#logger} to execute common
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.function.Consumer;
+
+/** Mock the {@link StateWithoutExecutionGraph.Context}. */
+class MockStateWithoutExecutionGraphContext
+        implements StateWithoutExecutionGraph.Context, AutoCloseable {
+
+    private final StateValidator<ArchivedExecutionGraph> 
finishedStateValidator =
+            new StateValidator<>("Finished");
+
+    protected boolean hadStateTransition = false;

Review Comment:
   This might open typo fix proposals because devs are used to using `has` as a 
common token in field names. What about changing this to `hasStateTransitioned`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockStateWithoutExecutionGraphContext.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.function.Consumer;
+
+/** Mock the {@link StateWithoutExecutionGraph.Context}. */
+class MockStateWithoutExecutionGraphContext
+        implements StateWithoutExecutionGraph.Context, AutoCloseable {

Review Comment:
   I know that this is how it's done in the other state test cases, but: 
`AutoCloseable` not the JUnit5 way of doing things. We could use 
`AfterEachCallback` instead and wouldn't have to do the try blocks in the tests 
anymore. Instead, we would have a single member `testContext` that is 
instantiated by JUnit and does the verification after each test run. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to