This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2eb5d1ce886824fb9eb61847ab56ffba4223a2bf
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Apr 1 14:35:53 2021 +0200

    [FLINK-21942][tests] Introduce TestingJobLeaderIdService and use it in 
ResourceManagerTest
    
    This closes #15407.
---
 .../resourcemanager/ResourceManagerTest.java       |  39 +++--
 .../resourcemanager/TestingJobLeaderIdService.java | 170 +++++++++++++++++++++
 .../resourcemanager/TestingResourceManager.java    |   7 -
 3 files changed, 194 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index b6971ef..c073a15 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -45,6 +46,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.ThrowingConsumer;
 
@@ -55,7 +57,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -68,6 +69,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /** Tests for the {@link ResourceManager}. */
 public class ResourceManagerTest extends TestLogger {
@@ -321,11 +323,14 @@ public class ResourceManagerTest extends TestLogger {
                         .build();
         rpcService.registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
 
+        final OneShotLatch jobAdded = new OneShotLatch();
+        final OneShotLatch jobRemoved = new OneShotLatch();
+
         final JobLeaderIdService jobLeaderIdService =
-                new DefaultJobLeaderIdService(
-                        highAvailabilityServices,
-                        rpcService.getScheduledExecutor(),
-                        TestingUtils.infiniteTime());
+                TestingJobLeaderIdService.newBuilder()
+                        .setAddJobConsumer(ignored -> jobAdded.trigger())
+                        .setRemoveJobConsumer(ignored -> jobRemoved.trigger())
+                        .build();
         resourceManager = createAndStartResourceManager(heartbeatServices, 
jobLeaderIdService);
 
         highAvailabilityServices.setJobMasterLeaderRetrieverFunction(
@@ -343,18 +348,22 @@ public class ResourceManagerTest extends TestLogger {
                 jobMasterGateway.getAddress(),
                 jobId,
                 TIMEOUT);
-        final boolean isAdded = runInMainThread(() -> 
jobLeaderIdService.containsJob(jobId));
-        assertThat(isAdded, is(true));
 
-        resourceManagerGateway.disconnectJobManager(jobId, jobStatus, null);
-        final boolean isRemoved = runInMainThread(() -> 
!jobLeaderIdService.containsJob(jobId));
-        assertThat(isRemoved, is(jobStatus.isGloballyTerminalState()));
-    }
+        jobAdded.await();
 
-    private <T> T runInMainThread(Callable<T> callable) throws Exception {
-        return resourceManager
-                .runInMainThread(callable, TIMEOUT)
-                .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+        resourceManagerGateway.disconnectJobManager(
+                jobId, jobStatus, new FlinkException("Test exception"));
+
+        if (jobStatus.isGloballyTerminalState()) {
+            jobRemoved.await();
+        } else {
+            // job should not get removed
+            try {
+                jobRemoved.await(10L, TimeUnit.MILLISECONDS);
+                fail("We should not have removed the job.");
+            } catch (TimeoutException expected) {
+            }
+        }
     }
 
     private void runHeartbeatTimeoutTest(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingJobLeaderIdService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingJobLeaderIdService.java
new file mode 100644
index 0000000..1057283
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingJobLeaderIdService.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/** Testing {@link JobLeaderIdService} implementation. */
+public class TestingJobLeaderIdService implements JobLeaderIdService {
+    private final Consumer<JobLeaderIdActions> startConsumer;
+    private final Runnable stopRunnable;
+    private final Runnable clearRunnable;
+    private final Consumer<JobID> addJobConsumer;
+    private final Consumer<JobID> removeJobConsumer;
+    private final Function<JobID, Boolean> containsJobFunction;
+    private final Function<JobID, CompletableFuture<JobMasterId>> 
getLeaderIdFunction;
+    private final BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction;
+
+    private TestingJobLeaderIdService(
+            Consumer<JobLeaderIdActions> startConsumer,
+            Runnable stopRunnable,
+            Runnable clearRunnable,
+            Consumer<JobID> addJobConsumer,
+            Consumer<JobID> removeJobConsumer,
+            Function<JobID, Boolean> containsJobFunction,
+            Function<JobID, CompletableFuture<JobMasterId>> 
getLeaderIdFunction,
+            BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction) {
+        this.startConsumer = startConsumer;
+        this.stopRunnable = stopRunnable;
+        this.clearRunnable = clearRunnable;
+        this.addJobConsumer = addJobConsumer;
+        this.removeJobConsumer = removeJobConsumer;
+        this.containsJobFunction = containsJobFunction;
+        this.getLeaderIdFunction = getLeaderIdFunction;
+        this.isValidTimeoutFunction = isValidTimeoutFunction;
+    }
+
+    @Override
+    public void start(JobLeaderIdActions initialJobLeaderIdActions) throws 
Exception {
+        startConsumer.accept(initialJobLeaderIdActions);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        stopRunnable.run();
+    }
+
+    @Override
+    public void clear() throws Exception {
+        clearRunnable.run();
+    }
+
+    @Override
+    public void addJob(JobID jobId) {
+        addJobConsumer.accept(jobId);
+    }
+
+    @Override
+    public void removeJob(JobID jobId) {
+        removeJobConsumer.accept(jobId);
+    }
+
+    @Override
+    public boolean containsJob(JobID jobId) {
+        return containsJobFunction.apply(jobId);
+    }
+
+    @Override
+    public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws 
Exception {
+        return getLeaderIdFunction.apply(jobId);
+    }
+
+    @Override
+    public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
+        return isValidTimeoutFunction.apply(jobId, timeoutId);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        private Consumer<JobLeaderIdActions> startConsumer = ignored -> {};
+        private Runnable stopRunnable = () -> {};
+        private Runnable clearRunnable = () -> {};
+        private Consumer<JobID> addJobConsumer = ignored -> {};
+        private Consumer<JobID> removeJobConsumer = ignored -> {};
+        private Function<JobID, Boolean> containsJobFunction = ignored -> 
false;
+        private Function<JobID, CompletableFuture<JobMasterId>> 
getLeaderIdFunction =
+                ignored -> new CompletableFuture<>();
+        private BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction =
+                (ignoredA, ignoredB) -> false;
+
+        public Builder setStartConsumer(Consumer<JobLeaderIdActions> 
startConsumer) {
+            this.startConsumer = startConsumer;
+            return this;
+        }
+
+        public Builder setStopRunnable(Runnable stopRunnable) {
+            this.stopRunnable = stopRunnable;
+            return this;
+        }
+
+        public Builder setClearRunnable(Runnable clearRunnable) {
+            this.clearRunnable = clearRunnable;
+            return this;
+        }
+
+        public Builder setAddJobConsumer(Consumer<JobID> addJobConsumer) {
+            this.addJobConsumer = addJobConsumer;
+            return this;
+        }
+
+        public Builder setRemoveJobConsumer(Consumer<JobID> removeJobConsumer) 
{
+            this.removeJobConsumer = removeJobConsumer;
+            return this;
+        }
+
+        public Builder setContainsJobFunction(Function<JobID, Boolean> 
containsJobFunction) {
+            this.containsJobFunction = containsJobFunction;
+            return this;
+        }
+
+        public Builder setGetLeaderIdFunction(
+                Function<JobID, CompletableFuture<JobMasterId>> 
getLeaderIdFunction) {
+            this.getLeaderIdFunction = getLeaderIdFunction;
+            return this;
+        }
+
+        public Builder setIsValidTimeoutFunction(
+                BiFunction<JobID, UUID, Boolean> isValidTimeoutFunction) {
+            this.isValidTimeoutFunction = isValidTimeoutFunction;
+            return this;
+        }
+
+        public TestingJobLeaderIdService build() {
+            return new TestingJobLeaderIdService(
+                    startConsumer,
+                    stopRunnable,
+                    clearRunnable,
+                    addJobConsumer,
+                    removeJobConsumer,
+                    containsJobFunction,
+                    getLeaderIdFunction,
+                    isValidTimeoutFunction);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index f3b6396..6479a45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -34,8 +33,6 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 
 import javax.annotation.Nullable;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
 
 /** Simple {@link ResourceManager} implementation for testing purposes. */
@@ -98,8 +95,4 @@ public class TestingResourceManager extends 
ResourceManager<ResourceID> {
         // cannot stop workers
         return false;
     }
-
-    <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time 
timeout) {
-        return callAsync(callable, timeout);
-    }
 }

Reply via email to