This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2eb5d1ce886824fb9eb61847ab56ffba4223a2bf Author: Till Rohrmann <[email protected]> AuthorDate: Thu Apr 1 14:35:53 2021 +0200 [FLINK-21942][tests] Introduce TestingJobLeaderIdService and use it in ResourceManagerTest This closes #15407. --- .../resourcemanager/ResourceManagerTest.java | 39 +++-- .../resourcemanager/TestingJobLeaderIdService.java | 170 +++++++++++++++++++++ .../resourcemanager/TestingResourceManager.java | 7 - 3 files changed, 194 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index b6971ef..c073a15 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -45,6 +46,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.ThrowingConsumer; @@ -55,7 +57,6 @@ import org.junit.BeforeClass; import org.junit.Test; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -68,6 +69,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** Tests for the {@link ResourceManager}. */ public class ResourceManagerTest extends TestLogger { @@ -321,11 +323,14 @@ public class ResourceManagerTest extends TestLogger { .build(); rpcService.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + final OneShotLatch jobAdded = new OneShotLatch(); + final OneShotLatch jobRemoved = new OneShotLatch(); + final JobLeaderIdService jobLeaderIdService = - new DefaultJobLeaderIdService( - highAvailabilityServices, - rpcService.getScheduledExecutor(), - TestingUtils.infiniteTime()); + TestingJobLeaderIdService.newBuilder() + .setAddJobConsumer(ignored -> jobAdded.trigger()) + .setRemoveJobConsumer(ignored -> jobRemoved.trigger()) + .build(); resourceManager = createAndStartResourceManager(heartbeatServices, jobLeaderIdService); highAvailabilityServices.setJobMasterLeaderRetrieverFunction( @@ -343,18 +348,22 @@ public class ResourceManagerTest extends TestLogger { jobMasterGateway.getAddress(), jobId, TIMEOUT); - final boolean isAdded = runInMainThread(() -> jobLeaderIdService.containsJob(jobId)); - assertThat(isAdded, is(true)); - resourceManagerGateway.disconnectJobManager(jobId, jobStatus, null); - final boolean isRemoved = runInMainThread(() -> !jobLeaderIdService.containsJob(jobId)); - assertThat(isRemoved, is(jobStatus.isGloballyTerminalState())); - } + jobAdded.await(); - private <T> T runInMainThread(Callable<T> callable) throws Exception { - return resourceManager - .runInMainThread(callable, TIMEOUT) - .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + resourceManagerGateway.disconnectJobManager( + jobId, jobStatus, new FlinkException("Test exception")); + + if (jobStatus.isGloballyTerminalState()) { + jobRemoved.await(); + } else { + // job should not get removed + try { + jobRemoved.await(10L, TimeUnit.MILLISECONDS); + fail("We should not have removed the job."); + } catch (TimeoutException expected) { + } + } } private void runHeartbeatTimeoutTest( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingJobLeaderIdService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingJobLeaderIdService.java new file mode 100644 index 0000000..1057283 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingJobLeaderIdService.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobMasterId; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** Testing {@link JobLeaderIdService} implementation. */ +public class TestingJobLeaderIdService implements JobLeaderIdService { + private final Consumer<JobLeaderIdActions> startConsumer; + private final Runnable stopRunnable; + private final Runnable clearRunnable; + private final Consumer<JobID> addJobConsumer; + private final Consumer<JobID> removeJobConsumer; + private final Function<JobID, Boolean> containsJobFunction; + private final Function<JobID, CompletableFuture<JobMasterId>> getLeaderIdFunction; + private final BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction; + + private TestingJobLeaderIdService( + Consumer<JobLeaderIdActions> startConsumer, + Runnable stopRunnable, + Runnable clearRunnable, + Consumer<JobID> addJobConsumer, + Consumer<JobID> removeJobConsumer, + Function<JobID, Boolean> containsJobFunction, + Function<JobID, CompletableFuture<JobMasterId>> getLeaderIdFunction, + BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction) { + this.startConsumer = startConsumer; + this.stopRunnable = stopRunnable; + this.clearRunnable = clearRunnable; + this.addJobConsumer = addJobConsumer; + this.removeJobConsumer = removeJobConsumer; + this.containsJobFunction = containsJobFunction; + this.getLeaderIdFunction = getLeaderIdFunction; + this.isValidTimeoutFunction = isValidTimeoutFunction; + } + + @Override + public void start(JobLeaderIdActions initialJobLeaderIdActions) throws Exception { + startConsumer.accept(initialJobLeaderIdActions); + } + + @Override + public void stop() throws Exception { + stopRunnable.run(); + } + + @Override + public void clear() throws Exception { + clearRunnable.run(); + } + + @Override + public void addJob(JobID jobId) { + addJobConsumer.accept(jobId); + } + + @Override + public void removeJob(JobID jobId) { + removeJobConsumer.accept(jobId); + } + + @Override + public boolean containsJob(JobID jobId) { + return containsJobFunction.apply(jobId); + } + + @Override + public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws Exception { + return getLeaderIdFunction.apply(jobId); + } + + @Override + public boolean isValidTimeout(JobID jobId, UUID timeoutId) { + return isValidTimeoutFunction.apply(jobId, timeoutId); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + private Consumer<JobLeaderIdActions> startConsumer = ignored -> {}; + private Runnable stopRunnable = () -> {}; + private Runnable clearRunnable = () -> {}; + private Consumer<JobID> addJobConsumer = ignored -> {}; + private Consumer<JobID> removeJobConsumer = ignored -> {}; + private Function<JobID, Boolean> containsJobFunction = ignored -> false; + private Function<JobID, CompletableFuture<JobMasterId>> getLeaderIdFunction = + ignored -> new CompletableFuture<>(); + private BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction = + (ignoredA, ignoredB) -> false; + + public Builder setStartConsumer(Consumer<JobLeaderIdActions> startConsumer) { + this.startConsumer = startConsumer; + return this; + } + + public Builder setStopRunnable(Runnable stopRunnable) { + this.stopRunnable = stopRunnable; + return this; + } + + public Builder setClearRunnable(Runnable clearRunnable) { + this.clearRunnable = clearRunnable; + return this; + } + + public Builder setAddJobConsumer(Consumer<JobID> addJobConsumer) { + this.addJobConsumer = addJobConsumer; + return this; + } + + public Builder setRemoveJobConsumer(Consumer<JobID> removeJobConsumer) { + this.removeJobConsumer = removeJobConsumer; + return this; + } + + public Builder setContainsJobFunction(Function<JobID, Boolean> containsJobFunction) { + this.containsJobFunction = containsJobFunction; + return this; + } + + public Builder setGetLeaderIdFunction( + Function<JobID, CompletableFuture<JobMasterId>> getLeaderIdFunction) { + this.getLeaderIdFunction = getLeaderIdFunction; + return this; + } + + public Builder setIsValidTimeoutFunction( + BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction) { + this.isValidTimeoutFunction = isValidTimeoutFunction; + return this; + } + + public TestingJobLeaderIdService build() { + return new TestingJobLeaderIdService( + startConsumer, + stopRunnable, + clearRunnable, + addJobConsumer, + removeJobConsumer, + containsJobFunction, + getLeaderIdFunction, + isValidTimeoutFunction); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index f3b6396..6479a45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.resourcemanager; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -34,8 +33,6 @@ import org.apache.flink.runtime.rpc.RpcUtils; import javax.annotation.Nullable; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; /** Simple {@link ResourceManager} implementation for testing purposes. */ @@ -98,8 +95,4 @@ public class TestingResourceManager extends ResourceManager<ResourceID> { // cannot stop workers return false; } - - <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) { - return callAsync(callable, timeout); - } }
