asfgit closed pull request #6699: [FLINK-10314] Making JobManagerRunner
creation non-blocking in Dispatcher
URL: https://github.com/apache/flink/pull/6699
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index 678ef9f78b6..83846a62b64 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -32,6 +32,19 @@ private FunctionUtils() {
throw new UnsupportedOperationException("This class should
never be instantiated.");
}
+ private static final Function<Object, Void> NULL_FN = ignored -> null;
+
+ /**
+ * Function which returns {@code null} (type: Void).
+ *
+ * @param <T> input type
+ * @return Function which returns {@code null}.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Function<T, Void> nullFn() {
+ return (Function<T, Void>) NULL_FN;
+ }
+
/**
* Convert at {@link FunctionWithException} into a {@link Function}.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5279e502a93..a1da2131e48 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -63,10 +63,11 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.FunctionUtils;
-import org.apache.flink.util.function.ThrowingConsumer;
-import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -110,7 +111,7 @@
private final FatalErrorHandler fatalErrorHandler;
- private final Map<JobID, JobManagerRunner> jobManagerRunners;
+ private final Map<JobID, CompletableFuture<JobManagerRunner>>
jobManagerRunnerFutures;
private final LeaderElectionService leaderElectionService;
@@ -166,7 +167,7 @@ public Dispatcher(
this.runningJobsRegistry =
highAvailabilityServices.getRunningJobsRegistry();
- jobManagerRunners = new HashMap<>(16);
+ jobManagerRunnerFutures = new HashMap<>(16);
leaderElectionService =
highAvailabilityServices.getDispatcherLeaderElectionService();
@@ -248,7 +249,7 @@ public void start() throws Exception {
return FutureUtils.completedExceptionally(new
FlinkException(String.format("Failed to retrieve job scheduling status for job
%s.", jobId), e));
}
- if (jobSchedulingStatus ==
RunningJobsRegistry.JobSchedulingStatus.DONE ||
jobManagerRunners.containsKey(jobId)) {
+ if (jobSchedulingStatus ==
RunningJobsRegistry.JobSchedulingStatus.DONE ||
jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId,
String.format("Job has already been submitted and is in state %s.",
jobSchedulingStatus)));
} else {
@@ -257,58 +258,72 @@ public void start() throws Exception {
return persistAndRunFuture.exceptionally(
(Throwable throwable) -> {
+ final Throwable strippedThrowable =
ExceptionUtils.stripCompletionException(throwable);
+ log.error("Failed to submit job {}.",
jobId, strippedThrowable);
throw new CompletionException(
- new
JobSubmissionException(jobId, "Failed to submit job.", throwable));
+ new
JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
});
}
}
- private void persistAndRunJob(JobGraph jobGraph) throws Exception {
+ private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph)
throws Exception {
submittedJobGraphStore.putJobGraph(new
SubmittedJobGraph(jobGraph, null));
- try {
- runJob(jobGraph);
- } catch (Exception e) {
- try {
+ final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
+
+ return
runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored,
Throwable throwable) -> {
+ if (throwable != null) {
submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
- } catch (Exception ie) {
- e.addSuppressed(ie);
}
-
- throw e;
- }
+ }));
}
- private void runJob(JobGraph jobGraph) throws Exception {
-
Preconditions.checkState(!jobManagerRunners.containsKey(jobGraph.getJobID()));
+ private CompletableFuture<Void> runJob(JobGraph jobGraph) {
+
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
- final JobManagerRunner jobManagerRunner =
createJobManagerRunner(jobGraph);
+ final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
- jobManagerRunner.start();
+ jobManagerRunnerFutures.put(jobGraph.getJobID(),
jobManagerRunnerFuture);
- jobManagerRunners.put(jobGraph.getJobID(), jobManagerRunner);
+ return jobManagerRunnerFuture
+ .thenApply(FunctionUtils.nullFn())
+ .whenCompleteAsync(
+ (ignored, throwable) -> {
+ if (throwable != null) {
+
jobManagerRunnerFutures.remove(jobGraph.getJobID());
+ }
+ },
+ getMainThreadExecutor());
}
- private JobManagerRunner createJobManagerRunner(JobGraph jobGraph)
throws Exception {
- final JobID jobId = jobGraph.getJobID();
-
- final JobManagerRunner jobManagerRunner =
jobManagerRunnerFactory.createJobManagerRunner(
- ResourceID.generate(),
- jobGraph,
- configuration,
- getRpcService(),
- highAvailabilityServices,
- heartbeatServices,
- blobServer,
- jobManagerSharedServices,
- new
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
- fatalErrorHandler);
+ private CompletableFuture<JobManagerRunner>
createJobManagerRunner(JobGraph jobGraph) {
+ final RpcService rpcService = getRpcService();
+
+ final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture = CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(() ->
+ jobManagerRunnerFactory.createJobManagerRunner(
+ ResourceID.generate(),
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ blobServer,
+ jobManagerSharedServices,
+ new
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+ fatalErrorHandler)),
+ rpcService.getExecutor());
+
+ return
jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
+ }
+ private JobManagerRunner startJobManagerRunner(JobManagerRunner
jobManagerRunner) throws Exception {
+ final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
jobManagerRunner.getResultFuture().whenCompleteAsync(
(ArchivedExecutionGraph archivedExecutionGraph,
Throwable throwable) -> {
// check if we are still the active
JobManagerRunner by checking the identity
//noinspection ObjectEquality
- if (jobManagerRunner ==
jobManagerRunners.get(jobId)) {
+ if (jobManagerRunner ==
jobManagerRunnerFutures.get(jobId).getNow(null)) {
if (archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
@@ -325,13 +340,15 @@ private JobManagerRunner createJobManagerRunner(JobGraph
jobGraph) throws Except
}
}, getMainThreadExecutor());
+ jobManagerRunner.start();
+
return jobManagerRunner;
}
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
return CompletableFuture.completedFuture(
- Collections.unmodifiableSet(new
HashSet<>(jobManagerRunners.keySet())));
+ Collections.unmodifiableSet(new
HashSet<>(jobManagerRunnerFutures.keySet())));
}
@Override
@@ -481,9 +498,9 @@ private JobManagerRunner createJobManagerRunner(JobGraph
jobGraph) throws Except
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time
timeout) {
- final JobManagerRunner jobManagerRunner =
jobManagerRunners.get(jobId);
+ final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
- if (jobManagerRunner == null) {
+ if (jobManagerRunnerFuture == null) {
final ArchivedExecutionGraph archivedExecutionGraph =
archivedExecutionGraphStore.get(jobId);
if (archivedExecutionGraph == null) {
@@ -492,7 +509,7 @@ private JobManagerRunner createJobManagerRunner(JobGraph
jobGraph) throws Except
return
CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
}
} else {
- return
jobManagerRunner.getResultFuture().thenApply(JobResult::createFrom);
+ return
jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
}
}
@@ -566,11 +583,11 @@ private void
registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableF
}
private CompletableFuture<Void> removeJob(JobID jobId, boolean
cleanupHA) {
- JobManagerRunner jobManagerRunner =
jobManagerRunners.remove(jobId);
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
jobManagerRunnerFutures.remove(jobId);
final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
- if (jobManagerRunner != null) {
- jobManagerRunnerTerminationFuture =
jobManagerRunner.closeAsync();
+ if (jobManagerRunnerFuture != null) {
+ jobManagerRunnerTerminationFuture =
jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
} else {
jobManagerRunnerTerminationFuture =
CompletableFuture.completedFuture(null);
}
@@ -616,7 +633,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA)
{
private void terminateJobManagerRunners() {
log.info("Stopping all currently running jobs of dispatcher
{}.", getAddress());
- final HashSet<JobID> jobsToRemove = new
HashSet<>(jobManagerRunners.keySet());
+ final HashSet<JobID> jobsToRemove = new
HashSet<>(jobManagerRunnerFutures.keySet());
for (JobID jobId : jobsToRemove) {
removeJobAndRegisterTerminationFuture(jobId, false);
@@ -739,16 +756,16 @@ private void jobMasterFailed(JobID jobId, Throwable
cause) {
}
private CompletableFuture<JobMasterGateway>
getJobMasterGatewayFuture(JobID jobId) {
- final JobManagerRunner jobManagerRunner =
jobManagerRunners.get(jobId);
+ final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
- if (jobManagerRunner == null) {
+ if (jobManagerRunnerFuture == null) {
return FutureUtils.completedExceptionally(new
FlinkJobNotFoundException(jobId));
} else {
- final CompletableFuture<JobMasterGateway>
leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
+ final CompletableFuture<JobMasterGateway>
leaderGatewayFuture =
jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
return leaderGatewayFuture.thenApplyAsync(
(JobMasterGateway jobMasterGateway) -> {
// check whether the retrieved
JobMasterGateway belongs still to a running JobMaster
- if
(jobManagerRunners.containsKey(jobId)) {
+ if
(jobManagerRunnerFutures.containsKey(jobId)) {
return jobMasterGateway;
} else {
throw new
CompletionException(new FlinkJobNotFoundException(jobId));
@@ -764,12 +781,12 @@ private void jobMasterFailed(JobID jobId, Throwable
cause) {
@Nonnull
private <T> List<CompletableFuture<Optional<T>>>
queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>>
queryFunction) {
- final int numberJobsRunning = jobManagerRunners.size();
+ final int numberJobsRunning = jobManagerRunnerFutures.size();
ArrayList<CompletableFuture<Optional<T>>>
optionalJobInformation = new ArrayList<>(
numberJobsRunning);
- for (JobID jobId : jobManagerRunners.keySet()) {
+ for (JobID jobId : jobManagerRunnerFutures.keySet()) {
final CompletableFuture<JobMasterGateway>
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
final CompletableFuture<Optional<T>> optionalRequest =
jobMasterGatewayFuture
@@ -836,10 +853,10 @@ public void grantLeadership(final UUID
newLeaderSessionID) {
log.debug("Dispatcher {} accepted leadership with
fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
setNewFencingToken(dispatcherId);
- Collection<CompletableFuture<Void>> runFutures = new
ArrayList<>(recoveredJobs.size());
+ Collection<CompletableFuture<?>> runFutures = new
ArrayList<>(recoveredJobs.size());
for (JobGraph recoveredJob : recoveredJobs) {
- final CompletableFuture<Void> runFuture =
waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob,
this::runJob);
+ final CompletableFuture<?> runFuture =
waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob,
this::runJob);
runFutures.add(runFuture);
}
@@ -850,7 +867,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
}
}
- private CompletableFuture<Void> waitForTerminatingJobManager(JobID
jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
+ private CompletableFuture<Void> waitForTerminatingJobManager(JobID
jobId, JobGraph jobGraph, FunctionWithException<JobGraph,
CompletableFuture<Void>, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture =
getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
@@ -858,16 +875,16 @@ public void grantLeadership(final UUID
newLeaderSessionID) {
String.format("Termination of
previous JobManager for job %s failed. Cannot submit job under the same job
id.", jobId),
throwable)); });
- return jobManagerTerminationFuture.thenRunAsync(
- ThrowingRunnable.unchecked(() -> {
+ return jobManagerTerminationFuture.thenComposeAsync(
+ FunctionUtils.uncheckedFunction((ignored) -> {
jobManagerTerminationFutures.remove(jobId);
- action.accept(jobGraph);
+ return action.apply(jobGraph);
}),
getMainThreadExecutor());
}
CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
- if (jobManagerRunners.containsKey(jobId)) {
+ if (jobManagerRunnerFutures.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new
DispatcherException(String.format("Job with job id %s is still running.",
jobId)));
} else {
return jobManagerTerminationFutures.getOrDefault(jobId,
CompletableFuture.completedFuture(null));
@@ -923,7 +940,7 @@ public void handleError(final Exception exception) {
public void onAddedJobGraph(final JobID jobId) {
runAsync(
() -> {
- if (!jobManagerRunners.containsKey(jobId)) {
+ if
(!jobManagerRunnerFutures.containsKey(jobId)) {
// IMPORTANT: onAddedJobGraph can
generate false positives and, thus, we must expect that
// the specified job is already removed
from the SubmittedJobGraphStore. In this case,
// SubmittedJobGraphStore.recoverJob
returns null.
@@ -962,7 +979,7 @@ public void onAddedJobGraph(final JobID jobId) {
private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph
jobGraph, DispatcherId dispatcherId) throws Exception {
if (leaderElectionService.hasLeadership(dispatcherId.toUUID()))
{
final JobID jobId = jobGraph.getJobID();
- if (jobManagerRunners.containsKey(jobId)) {
+ if (jobManagerRunnerFutures.containsKey(jobId)) {
// we must not release the job graph lock since
it can only be locked once and
// is currently being executed. Once we support
multiple locks, we must release
// the JobGraph here
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 335199a2807..c825451d946 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -163,7 +163,7 @@ private HATestingDispatcher
createHADispatcher(TestingHighAvailabilityServices h
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>()),
+ new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>(),
CompletableFuture.completedFuture(null)),
testingFatalErrorHandler,
fencingTokens);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d09ab8df728..5c4ac34a314 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -29,26 +29,19 @@
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
-import
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -79,8 +72,6 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests the resource cleanup by the {@link Dispatcher}.
@@ -188,7 +179,7 @@ public void setup() throws Exception {
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(resultFuture,
terminationFuture),
+ new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), resultFuture, terminationFuture),
fatalErrorHandler);
dispatcher.start();
@@ -447,28 +438,6 @@ public void
testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception
assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
}
- private static final class TestingJobManagerRunnerFactory implements
Dispatcher.JobManagerRunnerFactory {
-
- private final CompletableFuture<ArchivedExecutionGraph>
resultFuture;
-
- private final CompletableFuture<Void> terminationFuture;
-
- private
TestingJobManagerRunnerFactory(CompletableFuture<ArchivedExecutionGraph>
resultFuture, CompletableFuture<Void> terminationFuture) {
- this.resultFuture = resultFuture;
- this.terminationFuture = terminationFuture;
- }
-
- @Override
- public JobManagerRunner createJobManagerRunner(ResourceID
resourceId, JobGraph jobGraph, Configuration configuration, RpcService
rpcService, HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices, BlobServer blobServer,
JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) {
- final JobManagerRunner jobManagerRunnerMock =
mock(JobManagerRunner.class);
-
-
when(jobManagerRunnerMock.getResultFuture()).thenReturn(resultFuture);
-
when(jobManagerRunnerMock.closeAsync()).thenReturn(terminationFuture);
-
- return jobManagerRunnerMock;
- }
- }
-
private static final class TestingBlobServer extends BlobServer {
private final CompletableFuture<JobID> cleanupJobFuture;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 1af10b8c598..24426761b2e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -64,6 +65,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.After;
import org.junit.AfterClass;
@@ -86,7 +88,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -144,6 +148,10 @@
/** Instance under test. */
private TestingDispatcher dispatcher;
+ private TestingHighAvailabilityServices haServices;
+
+ private HeartbeatServices heartbeatServices;
+
@BeforeClass
public static void setupClass() {
rpcService = new TestingRpcService();
@@ -166,13 +174,13 @@ public void setUp() throws Exception {
jobGraph.setAllowQueuedScheduling(true);
fatalErrorHandler = new TestingFatalErrorHandler();
- final HeartbeatServices heartbeatServices = new
HeartbeatServices(1000L, 10000L);
+ heartbeatServices = new HeartbeatServices(1000L, 10000L);
submittedJobGraphStore = new FaultySubmittedJobGraphStore();
dispatcherLeaderElectionService = new
TestingLeaderElectionService();
jobMasterLeaderElectionService = new
TestingLeaderElectionService();
- final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+ haServices = new TestingHighAvailabilityServices();
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
haServices.setJobMasterLeaderElectionService(TEST_JOB_ID,
jobMasterLeaderElectionService);
@@ -188,14 +196,18 @@ public void setUp() throws Exception {
createdJobManagerRunnerLatch = new CountDownLatch(2);
blobServer = new BlobServer(configuration, new VoidBlobStore());
+ }
- dispatcher = createDispatcher(heartbeatServices, haServices);
-
+ @Nonnull
+ private TestingDispatcher createAndStartDispatcher(HeartbeatServices
heartbeatServices, TestingHighAvailabilityServices haServices,
Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+ final TestingDispatcher dispatcher =
createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
dispatcher.start();
+
+ return dispatcher;
}
@Nonnull
- private TestingDispatcher createDispatcher(HeartbeatServices
heartbeatServices, TestingHighAvailabilityServices haServices) throws Exception
{
+ private TestingDispatcher createDispatcher(HeartbeatServices
heartbeatServices, TestingHighAvailabilityServices haServices,
Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
return new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -207,7 +219,7 @@ private TestingDispatcher
createDispatcher(HeartbeatServices heartbeatServices,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch),
+ jobManagerRunnerFactory,
fatalErrorHandler);
}
@@ -216,7 +228,13 @@ public void tearDown() throws Exception {
try {
fatalErrorHandler.rethrowError();
} finally {
- RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+ if (dispatcher != null) {
+ RpcUtils.terminateRpcEndpoint(dispatcher,
TIMEOUT);
+ }
+ }
+
+ if (haServices != null) {
+ haServices.closeAndCleanupAllData();
}
if (blobServer != null) {
@@ -230,6 +248,8 @@ public void tearDown() throws Exception {
*/
@Test
public void testJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
CompletableFuture<UUID> leaderFuture =
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
// wait for the leader to be elected
@@ -251,6 +271,8 @@ public void testJobSubmission() throws Exception {
*/
@Test
public void testLeaderElection() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
CompletableFuture<Void> jobIdsFuture = new
CompletableFuture<>();
submittedJobGraphStore.setJobIdsFunction(
(Collection<JobID> jobIds) -> {
@@ -270,6 +292,8 @@ public void testLeaderElection() throws Exception {
*/
@Test
public void testSubmittedJobGraphListener() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -294,6 +318,8 @@ public void testSubmittedJobGraphListener() throws
Exception {
@Test
public void testOnAddedJobGraphRecoveryFailure() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
final FlinkException expectedFailure = new
FlinkException("Expected failure");
submittedJobGraphStore.setRecoveryFailure(expectedFailure);
@@ -313,6 +339,8 @@ public void testOnAddedJobGraphRecoveryFailure() throws
Exception {
@Test
public void testOnAddedJobGraphWithFinishedJob() throws Throwable {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
submittedJobGraphStore.putJobGraph(new
SubmittedJobGraph(jobGraph, null));
@@ -333,6 +361,8 @@ public void testOnAddedJobGraphWithFinishedJob() throws
Throwable {
*/
@Test
public void testCacheJobExecutionResult() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -358,6 +388,8 @@ public void testCacheJobExecutionResult() throws Exception {
@Test
public void testThrowExceptionIfJobExecutionResultNotFound() throws
Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -374,6 +406,8 @@ public void
testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
*/
@Test
public void testJobRecovery() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
// elect the initial dispatcher as the leader
@@ -413,6 +447,8 @@ public void testSavepointDisposal() throws Exception {
final URI externalPointer = createTestingSavepoint();
final Path savepointPath = Paths.get(externalPointer);
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -447,7 +483,9 @@ private URI createTestingSavepoint() throws IOException,
URISyntaxException {
* to it. See FLINK-8887.
*/
@Test
- public void testWaitingForJobMasterLeadership() throws
ExecutionException, InterruptedException {
+ public void testWaitingForJobMasterLeadership() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -476,6 +514,8 @@ public void testWaitingForJobMasterLeadership() throws
ExecutionException, Inter
*/
@Test
public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
final FlinkException testException = new FlinkException("Test
exception");
submittedJobGraphStore.setJobIdsFunction(
(Collection<JobID> jobIds) -> {
@@ -500,6 +540,8 @@ public void testFatalErrorAfterJobIdRecoveryFailure()
throws Exception {
public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
final FlinkException testException = new FlinkException("Test
exception");
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
final SubmittedJobGraph submittedJobGraph = new
SubmittedJobGraph(jobGraph, null);
submittedJobGraphStore.putJobGraph(submittedJobGraph);
@@ -526,6 +568,8 @@ public void testFatalErrorAfterJobRecoveryFailure() throws
Exception {
public void testJobSubmissionErrorAfterJobRecovery() throws Exception {
final FlinkException testException = new FlinkException("Test
exception");
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+
final JobGraph failingJobGraph =
createFailingJobGraph(testException);
final SubmittedJobGraph submittedJobGraph = new
SubmittedJobGraph(failingJobGraph, null);
@@ -540,6 +584,102 @@ public void testJobSubmissionErrorAfterJobRecovery()
throws Exception {
fatalErrorHandler.clearError();
}
+ /**
+ * Tests that a blocking {@link JobManagerRunner} creation, e.g. due to
blocking FileSystem access,
+ * does not block the {@link Dispatcher}.
+ *
+ * <p>See FLINK-10314
+ */
+ @Test
+ public void testBlockingJobManagerRunner() throws Exception {
+ final OneShotLatch jobManagerRunnerCreationLatch = new
OneShotLatch();
+ dispatcher = createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ new
BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await));
+
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ final CompletableFuture<Acknowledge> submissionFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+ assertThat(submissionFuture.isDone(), is(false));
+
+ final CompletableFuture<Collection<String>>
metricQueryServicePathsFuture =
dispatcherGateway.requestMetricQueryServicePaths(Time.seconds(5L));
+
+ assertThat(metricQueryServicePathsFuture.get(), is(empty()));
+
+ assertThat(submissionFuture.isDone(), is(false));
+
+ jobManagerRunnerCreationLatch.trigger();
+
+ submissionFuture.get();
+ }
+
+ /**
+ * Tests that a failing {@link JobManagerRunner} will be properly
cleaned up.
+ */
+ @Test
+ public void testFailingJobManagerRunnerCleanup() throws Exception {
+ final FlinkException testException = new FlinkException("Test
exception.");
+ final ArrayBlockingQueue<Optional<Exception>> queue = new
ArrayBlockingQueue<>(2);
+
+ dispatcher = createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ new BlockingJobManagerRunnerFactory(() -> {
+ final Optional<Exception> take = queue.take();
+ final Exception exception = take.orElse(null);
+
+ if (exception != null) {
+ throw exception;
+ }
+ }));
+
+
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ CompletableFuture<Acknowledge> submissionFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+ assertThat(submissionFuture.isDone(), is(false));
+
+ queue.offer(Optional.of(testException));
+
+ try {
+ submissionFuture.get();
+ fail("Should fail because we could not instantiate the
JobManagerRunner.");
+ } catch (Exception e) {
+ assertThat(ExceptionUtils.findThrowable(e, t ->
t.equals(testException)).isPresent(), is(true));
+ }
+
+ submissionFuture = dispatcherGateway.submitJob(jobGraph,
TIMEOUT);
+
+ queue.offer(Optional.empty());
+
+ submissionFuture.get();
+ }
+
+ private final class BlockingJobManagerRunnerFactory extends
TestingJobManagerRunnerFactory {
+
+ @Nonnull
+ private final ThrowingRunnable<Exception>
jobManagerRunnerCreationLatch;
+
+ BlockingJobManagerRunnerFactory(@Nonnull
ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+ super(new CompletableFuture<>(), new
CompletableFuture<>(), CompletableFuture.completedFuture(null));
+
+ this.jobManagerRunnerCreationLatch =
jobManagerRunnerCreationLatch;
+ }
+
+ @Override
+ public JobManagerRunner createJobManagerRunner(ResourceID
resourceId, JobGraph jobGraph, Configuration configuration, RpcService
rpcService, HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices, BlobServer blobServer,
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {
+ jobManagerRunnerCreationLatch.run();
+
+ return super.createJobManagerRunner(resourceId,
jobGraph, configuration, rpcService, highAvailabilityServices,
heartbeatServices, blobServer, jobManagerSharedServices,
jobManagerJobMetricGroupFactory, fatalErrorHandler);
+ }
+ }
+
private void electDispatcher() {
UUID expectedLeaderSessionId = UUID.randomUUID();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 39c06f36ecf..eed23ed7179 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -125,7 +125,7 @@ public void setup() throws Exception {
jobGraphFuture = new CompletableFuture<>();
resultFuture = new CompletableFuture<>();
- testingJobManagerRunnerFactory = new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture);
+ testingJobManagerRunnerFactory = new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture,
CompletableFuture.completedFuture(null));
}
@After
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index f9be8883589..992f08713c6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -40,14 +40,16 @@
* {@link
org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory}
implementation for
* testing purposes.
*/
-final class TestingJobManagerRunnerFactory implements
Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements
Dispatcher.JobManagerRunnerFactory {
private final CompletableFuture<JobGraph> jobGraphFuture;
private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+ private final CompletableFuture<Void> terminationFuture;
- TestingJobManagerRunnerFactory(CompletableFuture<JobGraph>
jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+ TestingJobManagerRunnerFactory(CompletableFuture<JobGraph>
jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture,
CompletableFuture<Void> terminationFuture) {
this.jobGraphFuture = jobGraphFuture;
this.resultFuture = resultFuture;
+ this.terminationFuture = terminationFuture;
}
@Override
@@ -66,7 +68,8 @@ public JobManagerRunner createJobManagerRunner(
final JobManagerRunner mock = mock(JobManagerRunner.class);
when(mock.getResultFuture()).thenReturn(resultFuture);
-
when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+ when(mock.closeAsync()).thenReturn(terminationFuture);
+ when(mock.getJobGraph()).thenReturn(jobGraph);
return mock;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index b5662c064e8..9c23f9d7aa2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -154,7 +154,7 @@ public void testSubmittedJobGraphRelease() throws Exception
{
final TestingDispatcher dispatcher = createDispatcher(
testingHighAvailabilityServices,
- new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>()));
+ new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>(),
CompletableFuture.completedFuture(null)));
dispatcher.start();
@@ -223,11 +223,11 @@ public void testStandbyDispatcherJobExecution() throws
Exception {
final CompletableFuture<ArchivedExecutionGraph>
resultFuture = new CompletableFuture<>();
final TestingDispatcher dispatcher1 = createDispatcher(
haServices1,
- new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+ new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture,
CompletableFuture.completedFuture(null)));
final TestingDispatcher dispatcher2 = createDispatcher(
haServices2,
- new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>()));
+ new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>(),
CompletableFuture.completedFuture(null)));
try {
dispatcher1.start();
@@ -285,11 +285,11 @@ public void testStandbyDispatcherJobRecovery() throws
Exception {
final CompletableFuture<JobGraph>
jobGraphFuture1 = new CompletableFuture<>();
dispatcher1 = createDispatcher(
haServices,
- new
TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+ new
TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>(),
CompletableFuture.completedFuture(null)));
final CompletableFuture<JobGraph>
jobGraphFuture2 = new CompletableFuture<>();
dispatcher2 = createDispatcher(
haServices,
- new
TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+ new
TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>(),
CompletableFuture.completedFuture(null)));
dispatcher1.start();
dispatcher2.start();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index c38ea5d99c0..950a4e13674 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -298,7 +298,7 @@ public void disconnectJobManager(JobID jobId, Exception
cause) {
@Override
public CompletableFuture<ResourceOverview> requestResourceOverview(Time
timeout) {
- return FutureUtils.completedExceptionally(new
UnsupportedOperationException("Not yet implemented"));
+ return CompletableFuture.completedFuture(new
ResourceOverview(1, 1, 1));
}
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services