This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ebed442e4b1dbe3df5948dcf5cee6c3d38b7e8f5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Sat Apr 2 16:51:15 2022 +0200

    [FLINK-26995][tests] Fix tests failing with fork-reuse enabled
    
    - PipelinedApproximateSubpartitionTest re-uses PipelinedSubpartitionTest, 
but the latter shuts down the executor service
    - StreamGraphGeneratorTest relied on hard-coded transformation ids
    - TaskManagerRunnerTest nulled the security manager which could interfere 
with other tests
---
 .../io/network/partition/PipelinedSubpartitionTest.java  | 16 +++++++---------
 .../runtime/taskexecutor/TaskManagerRunnerTest.java      |  1 -
 .../streaming/api/graph/StreamGraphGeneratorTest.java    |  9 ++++++++-
 3 files changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index d5254c246ff..7e2b8a5b245 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -29,12 +29,13 @@ import 
org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.CheckedSupplier;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -62,12 +63,9 @@ import static org.mockito.Mockito.when;
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
     /** Executor service for concurrent produce/consume tests. */
-    private static final ExecutorService executorService = 
Executors.newCachedThreadPool();
-
-    @AfterClass
-    public static void shutdownExecutorService() throws Exception {
-        executorService.shutdownNow();
-    }
+    @ClassRule
+    public static final TestExecutorResource<ExecutorService> 
EXECUTOR_RESOURCE =
+            new TestExecutorResource<>(() -> Executors.newCachedThreadPool());
 
     @Override
     PipelinedSubpartition createSubpartition() throws Exception {
@@ -207,10 +205,10 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
         CompletableFuture<Boolean> producerResult =
                 CompletableFuture.supplyAsync(
-                        CheckedSupplier.unchecked(producer::call), 
executorService);
+                        CheckedSupplier.unchecked(producer::call), 
EXECUTOR_RESOURCE.getExecutor());
         CompletableFuture<Boolean> consumerResult =
                 CompletableFuture.supplyAsync(
-                        CheckedSupplier.unchecked(consumer::call), 
executorService);
+                        CheckedSupplier.unchecked(consumer::call), 
EXECUTOR_RESOURCE.getExecutor());
 
         FutureUtils.waitForAll(Arrays.asList(producerResult, consumerResult))
                 .get(60_000L, TimeUnit.MILLISECONDS);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
index 5980bc110cf..6406c1d06de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -71,7 +71,6 @@ public class TaskManagerRunnerTest extends TestLogger {
 
     @After
     public void after() throws Exception {
-        System.setSecurityManager(null);
         if (taskManagerRunner != null) {
             taskManagerRunner.close();
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 92e963ef8aa..7580642d420 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -86,6 +86,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -843,7 +844,13 @@ public class StreamGraphGeneratorTest extends TestLogger {
         partitionStream.map(value -> value).print();
 
         final StreamGraph streamGraph = env.getStreamGraph();
-        Assertions.assertThat(streamGraph.getStreamEdges(1, 3))
+
+        final List<Integer> nodeIds =
+                streamGraph.getStreamNodes().stream()
+                        .map(StreamNode::getId)
+                        .sorted(Integer::compare)
+                        .collect(Collectors.toList());
+        Assertions.assertThat(streamGraph.getStreamEdges(nodeIds.get(0), 
nodeIds.get(1)))
                 .hasSize(1)
                 .satisfies(
                         e ->

Reply via email to