This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f915f0962008e6dfa225dae5840544f71b6055b8 Author: Zhu Zhu <[email protected]> AuthorDate: Thu Nov 4 14:32:57 2021 +0800 [hotfix][tests] Factor PerJobMiniClusterFactoryTest#MyCancellableInvokable out to be a standalone test class in flink-runtime --- .../program/PerJobMiniClusterFactoryTest.java | 36 +-------------- .../testutils/WaitingCancelableInvokable.java | 54 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 34 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java index 7061f64..bd099fc 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java @@ -22,21 +22,18 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.testutils.CancelableInvokable; +import org.apache.flink.runtime.testutils.WaitingCancelableInvokable; import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Test; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; import static org.hamcrest.CoreMatchers.is; @@ -171,37 +168,8 @@ public class PerJobMiniClusterFactoryTest extends TestLogger { private static JobGraph getCancellableJobGraph() { JobVertex jobVertex = new JobVertex("jobVertex"); - jobVertex.setInvokableClass(MyCancellableInvokable.class); + jobVertex.setInvokableClass(WaitingCancelableInvokable.class); jobVertex.setParallelism(1); return JobGraphTestUtils.streamingJobGraph(jobVertex); } - - /** Invokable which waits until it is cancelled. */ - public static class MyCancellableInvokable extends CancelableInvokable { - - private final Object lock = new Object(); - private boolean running = true; - - public MyCancellableInvokable(Environment environment) { - super(environment); - } - - @Override - public void doInvoke() throws Exception { - synchronized (lock) { - while (running) { - lock.wait(); - } - } - } - - @Override - public Future<Void> cancel() { - synchronized (lock) { - running = false; - lock.notifyAll(); - } - return CompletableFuture.completedFuture(null); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WaitingCancelableInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WaitingCancelableInvokable.java new file mode 100644 index 0000000..6c68d66 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/WaitingCancelableInvokable.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +/** An {@link AbstractInvokable} which waits until it is canceled. */ +public class WaitingCancelableInvokable extends CancelableInvokable { + + private final Object lock = new Object(); + private boolean running = true; + + public WaitingCancelableInvokable(Environment environment) { + super(environment); + } + + @Override + public void doInvoke() throws Exception { + synchronized (lock) { + while (running) { + lock.wait(); + } + } + } + + @Override + public Future<Void> cancel() { + synchronized (lock) { + running = false; + lock.notifyAll(); + } + return CompletableFuture.completedFuture(null); + } +}
