This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ebed442e4b1dbe3df5948dcf5cee6c3d38b7e8f5 Author: Chesnay Schepler <[email protected]> AuthorDate: Sat Apr 2 16:51:15 2022 +0200 [FLINK-26995][tests] Fix tests failing with fork-reuse enabled - PipelinedApproximateSubpartitionTest re-uses PipelinedSubpartitionTest, but the latter shuts down the executor service - StreamGraphGeneratorTest relied on hard-coded transformation ids - TaskManagerRunnerTest nulled the security manager which could interfere with other tests --- .../io/network/partition/PipelinedSubpartitionTest.java | 16 +++++++--------- .../runtime/taskexecutor/TaskManagerRunnerTest.java | 1 - .../streaming/api/graph/StreamGraphGeneratorTest.java | 9 ++++++++- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index d5254c246ff..7e2b8a5b245 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -29,12 +29,13 @@ import org.apache.flink.runtime.io.network.util.TestConsumerCallback; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer; +import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.CheckedSupplier; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; +import org.junit.ClassRule; import org.junit.Test; import java.util.Arrays; @@ -62,12 +63,9 @@ import static org.mockito.Mockito.when; public class PipelinedSubpartitionTest extends SubpartitionTestBase { /** Executor service for concurrent produce/consume tests. */ - private static final ExecutorService executorService = Executors.newCachedThreadPool(); - - @AfterClass - public static void shutdownExecutorService() throws Exception { - executorService.shutdownNow(); - } + @ClassRule + public static final TestExecutorResource<ExecutorService> EXECUTOR_RESOURCE = + new TestExecutorResource<>(() -> Executors.newCachedThreadPool()); @Override PipelinedSubpartition createSubpartition() throws Exception { @@ -207,10 +205,10 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { CompletableFuture<Boolean> producerResult = CompletableFuture.supplyAsync( - CheckedSupplier.unchecked(producer::call), executorService); + CheckedSupplier.unchecked(producer::call), EXECUTOR_RESOURCE.getExecutor()); CompletableFuture<Boolean> consumerResult = CompletableFuture.supplyAsync( - CheckedSupplier.unchecked(consumer::call), executorService); + CheckedSupplier.unchecked(consumer::call), EXECUTOR_RESOURCE.getExecutor()); FutureUtils.waitForAll(Arrays.asList(producerResult, consumerResult)) .get(60_000L, TimeUnit.MILLISECONDS); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java index 5980bc110cf..6406c1d06de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java @@ -71,7 +71,6 @@ public class TaskManagerRunnerTest extends TestLogger { @After public void after() throws Exception { - System.setSecurityManager(null); if (taskManagerRunner != null) { taskManagerRunner.close(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 92e963ef8aa..7580642d420 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -86,6 +86,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.hamcrest.MatcherAssert.assertThat; @@ -843,7 +844,13 @@ public class StreamGraphGeneratorTest extends TestLogger { partitionStream.map(value -> value).print(); final StreamGraph streamGraph = env.getStreamGraph(); - Assertions.assertThat(streamGraph.getStreamEdges(1, 3)) + + final List<Integer> nodeIds = + streamGraph.getStreamNodes().stream() + .map(StreamNode::getId) + .sorted(Integer::compare) + .collect(Collectors.toList()); + Assertions.assertThat(streamGraph.getStreamEdges(nodeIds.get(0), nodeIds.get(1))) .hasSize(1) .satisfies( e ->
