tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r478403026
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,53 @@ public static ArchivedExecutionGraph
createFrom(ExecutionGraph executionGraph) {
executionGraph.getCheckpointStatsSnapshot(),
executionGraph.getStateBackendName().orElse(null));
}
+
+ /**
+ * Create a sparse ArchivedExecutionGraph for a job while it is still
initializing.
+ * Most fields will be empty, only job status and error-related fields
are set.
+ */
+ public static ArchivedExecutionGraph createFromInitializingJob(
+ JobID jobId,
+ String jobName,
+ JobStatus jobStatus,
+ @Nullable Throwable throwable,
+ long initializationTimestamp) {
+
+ long failureTime = System.currentTimeMillis();
Review comment:
Move this into the if branch.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ @Test
+ public void testStatusWhenInitializing() throws
+ Exception {
Review comment:
nit: this is just a personal question of taste: I think I would leave
`Exception` on the same line as `throws`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ @Test
+ public void testStatusWhenInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+ }
+
+ @Test
+ public void testStatusWhenRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish initialization
+ testContext.setRunning();
+
+ assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+ // result future not done
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ }
+
+ @Test
+ public void testStatusWhenJobFinished() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish job
+ testContext.setRunning();
+ testContext.finishJob();
+
+ assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+ // assert result future done
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
+ Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(
+ TIMEOUT);
+
+ Assert.assertThat(cancelFuture.isDone(), is(false));
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+ testContext.setRunning();
+ testContext.finishCancellation();
+
+ // assert that cancel future completes
+ cancelFuture.get();
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ // assert that the result future completes
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(TIMEOUT);
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+ testContext.finishCancellation();
+
+ cancelFuture.get();
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileFailed() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ CommonTestUtils.assertThrows("Artificial failure",
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+ }
+
+ @Test
+ public void testErrorWhileInitializing() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+ // now fail
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
is(exception));
+
+ Assert.assertTrue(dispatcherJob.closeAsync().isDone() &&
dispatcherJob.closeAsync().isCompletedExceptionally());
+ }
+
+ @Test
+ public void testCloseWhileInitializingSuccessfully() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+ Assert.assertThat(closeFuture.isDone(), is(false));
+
+ // set job running, so that we can cancel it
+ testContext.setRunning();
+
+ // assert future completes now
+ closeFuture.get();
+
+ // ensure the result future is complete (how it completes is up
to the JobManager)
+ CompletableFuture<ArchivedExecutionGraph> resultFuture =
dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished",
ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test
+ public void testCloseWhileInitializingErroneously() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+ Assert.assertThat(closeFuture.isDone(), is(false));
+
+ testContext.failInitialization(new RuntimeException("fail"));
+
+ // assert future completes now
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ closeFuture::get);
+
+ // ensure the result future is complete
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.FAILED));
+ }
+
+ @Test
+ public void testCloseWhileInitializingErroneouslyForRecovery() {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.RECOVERY);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+
+ testContext.failInitialization(new RuntimeException("fail"));
+
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ closeFuture::get);
+ // ensure the result future is completing exceptionally when
using RECOVERY execution
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ () -> dispatcherJob.getResultFuture().get());
+ }
+
+ @Test
+ public void testCloseWhileRunning() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // complete JobManager runner future to indicate to the
DispatcherJob that the Runner has been initialized
+ testContext.setRunning();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+
+ closeFuture.get();
+
+ // result future should complete exceptionally.
+ CompletableFuture<ArchivedExecutionGraph> resultFuture =
dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished",
ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUnavailableJobMasterGateway() {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ dispatcherJob.getJobMasterGateway();
+ }
+
+ private TestContext createTestContext(Dispatcher.ExecutionType type) {
+ final JobVertex testVertex = new JobVertex("testVertex");
+ testVertex.setInvokableClass(NoOpInvokable.class);
+
+ JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob",
testVertex);
+ CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+ DispatcherJob dispatcherJob =
DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+ jobGraph.getJobID(), jobGraph.getName(), type);
+
+ return new TestContext(
+ jobManagerRunnerCompletableFuture,
+ dispatcherJob,
+ jobGraph);
+ }
+
+ private static class TestContext {
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture;
+ private final DispatcherJob dispatcherJob;
+ private final JobGraph jobGraph;
+ private final TestingJobMasterGateway
mockRunningJobMasterGateway;
+ private final CompletableFuture<Acknowledge> cancellationFuture;
+
+ private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+
+ public TestContext(
+ CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture,
+ DispatcherJob dispatcherJob,
+ JobGraph jobGraph) {
+ this.jobManagerRunnerCompletableFuture =
jobManagerRunnerCompletableFuture;
+ this.dispatcherJob = dispatcherJob;
+ this.jobGraph = jobGraph;
+
+ this.cancellationFuture = new CompletableFuture<>();
+ this.mockRunningJobMasterGateway = new
TestingJobMasterGatewayBuilder()
+ .setRequestJobSupplier(() ->
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(),
"test", internalJobStatus, null, 1337)))
+ .setRequestJobDetailsSupplier(() -> {
+ JobDetails jobDetails = new
JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+ new int[]{0, 0, 0, 0, 0, 0, 0,
0, 0}, 0);
+ return
CompletableFuture.completedFuture(jobDetails);
+ })
+ // once JobManagerRunner is initialized,
complete result future with CANCELLED AEG and ack cancellation.
+ .setCancelFunction(() -> {
+ internalJobStatus =
JobStatus.CANCELLING;
+ return cancellationFuture;
+ })
+ .build();
+ }
+
+ public JobID getJobID() {
+ return jobGraph.getJobID();
+ }
+
+ public void failInitialization(Throwable ex) {
+
jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+ }
+
+ public DispatcherJob getDispatcherJob() {
+ return dispatcherJob;
+ }
+
+ public void setRunning() {
+ internalJobStatus = JobStatus.RUNNING;
+ TestingJobManagerRunner jobManagerRunner =
+ new TestingJobManagerRunner(getJobID(), false);
+
jobManagerRunner.getJobMasterGateway().complete(mockRunningJobMasterGateway);
+
jobManagerRunnerCompletableFuture.complete(jobManagerRunner);
+ }
+
+ public void finishJob() {
+ try {
+ internalJobStatus = JobStatus.FINISHED;
+
jobManagerRunnerCompletableFuture.get().getResultFuture()
Review comment:
`CompletableFuture.join()` throws an unchecked exception which might
make this a bit easier here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -544,10 +705,13 @@ public void
testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
.build();
dispatcher.start();
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- final CompletableFuture<Acknowledge> submissionFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
- submissionFuture.get();
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get());
Review comment:
Why do we have to wait for this here?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void
testJobSubmissionWithPartialResourceConfigured() throws Exception {
}
}
+ @Test(timeout = 5_000L)
+ public void testNonBlockingJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status from status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // ensure correct JobDetails
+ MultipleJobsDetails multiDetails =
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+ Assert.assertEquals(1, multiDetails.getJobs().size());
+ Assert.assertEquals(blockingJobGraph.getJobID(),
multiDetails.getJobs().iterator().next().getJobId());
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobVertex.unblock();
+
+ // wait till job is running
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+ dispatcherGateway.cancelJob(blockingJobGraph.getJobID(),
TIMEOUT).get();
+ }
+
+ @Test(timeout = 5_000L)
+ public void testInvalidCallDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // this call is supposed to fail
+ boolean exceptionSeen = false;
Review comment:
by adding a `fail` after `dG.triggerSavepoint`, we could get rid of
`exceptionSeen`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
##########
@@ -31,11 +31,11 @@
/**
* {@link Dispatcher} implementation used for testing purposes.
*/
-class TestingDispatcher extends Dispatcher {
+public class TestingDispatcher extends Dispatcher {
Review comment:
Please revert after updating the `ClientUtilsTest`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not
updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private DispatcherJobStatus jobStatus =
DispatcherJobStatus.INITIALIZING;
+
+ private enum DispatcherJobStatus {
+ // We are waiting for the JobManagerRunner to be initialized
+ INITIALIZING(JobStatus.INITIALIZING),
+ // JobManagerRunner is initialized
+ JOB_MANAGER_RUNNER_INITIALIZED(null),
+ // waiting for cancellation. We stay in this status until the
job result future completed,
+ // then we consider the JobManager to be initialized.
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable
+ private final JobStatus jobStatus;
+
+ DispatcherJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public JobStatus asJobStatus() {
+ if (jobStatus == null) {
+ throw new IllegalStateException("This state is
not defined as a 'JobStatus'");
+ }
+ return jobStatus;
+ }
+ }
+
+ static DispatcherJob createFor(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, executionType);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.initializationTimestamp = System.currentTimeMillis();
+ this.jobResultFuture = new CompletableFuture<>();
+
+
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
throwable) -> {
+ // JM has been initialized, or the initialization failed
+ synchronized (lock) {
+ if (jobStatus !=
DispatcherJobStatus.CANCELLING) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ if (throwable == null) {
+ // Forward result future
+
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+ } else { // failure during initialization
+ if (executionType ==
Dispatcher.ExecutionType.RECOVERY) {
+
jobResultFuture.completeExceptionally(throwable);
+ } else {
+
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ JobStatus.FAILED,
+ throwable,
+
initializationTimestamp));
+ }
+ }
+ }
+ return null;
+ }));
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+ return jobResultFuture;
+ }
+
+ public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ return requestJobStatus(timeout).thenApply(status -> {
+ int[] tasksPerState = new
int[ExecutionState.values().length];
+ synchronized (lock) {
+ return new JobDetails(
+ jobId,
+ jobName,
+ initializationTimestamp,
+ 0,
+ 0,
+ status,
+ 0,
+ tasksPerState,
+ 0);
+ }
+ });
+ }
+
+ /**
+ * Cancel job.
+ * A cancellation will be scheduled if the initialization is not
completed.
+ * The returned future will complete exceptionally if the
JobManagerRunner initialization failed.
+ */
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
+ } else {
+ log.info("Cancellation during initialization
requested for job {}. Job will be cancelled once JobManager has been
initialized.", jobId);
+
+ // cancel job
+ jobManagerRunnerFuture
+
.thenCompose(JobManagerRunner::getJobMasterGateway)
+ .thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+ .whenComplete((ignored,
cancelThrowable) -> {
+ if (cancelThrowable != null) {
+ log.warn("Cancellation
of job {} failed", jobId, cancelThrowable);
+ }
+ });
+ jobStatus = DispatcherJobStatus.CANCELLING;
+
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+ if (archivedExecutionGraph != null) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }));
+ return jobResultFuture.thenApply(ignored ->
Acknowledge.get());
+ }
+ }
+ }
+
+ public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+ return
requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+ }
+
+ /**
+ * Returns a future completing to the ArchivedExecutionGraph of the job.
+ */
+ public CompletableFuture<ArchivedExecutionGraph> requestJob(Time
timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ if (jobResultFuture.isDone()) { // job is not
running anymore
+ return jobResultFuture;
+ }
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.requestJob(
+ timeout));
+ } else {
+ Preconditions.checkState(this.jobStatus ==
DispatcherJobStatus.INITIALIZING || jobStatus ==
DispatcherJobStatus.CANCELLING);
+ return CompletableFuture.completedFuture(
+
ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ jobStatus.asJobStatus(),
+ null,
+ initializationTimestamp));
+ }
+ }
+ }
+
+ /**
+ * The job is initialized once the JobManager runner has been
initialized.
+ * It is also initialized if the runner initialization failed, or of it
has been
+ * canceled (and the cancellation is complete).
+ */
+ public boolean isInitialized() {
+ synchronized (lock) {
+ return jobStatus ==
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }
+
+ /**
+ * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+ * This method will fail with an {@link IllegalStateException} if the
job is initialized.
+ * The returned future will complete exceptionally if the
JobManagerRunner initialization failed.
+ * @return the {@link JobMasterGateway}.
+ */
+ public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+ Preconditions.checkState(
+ isInitialized(),
+ "JobMaster Gateway is not available during
initialization");
+ return
jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+
FutureUtils.assertNoException(jobManagerRunnerFuture.handle((runner, throwable)
-> {
+ if (throwable == null) {
+ // init was successful: close jobManager runner.
+ CompletableFuture<Void> jobManagerRunnerClose =
jobManagerRunnerFuture.thenCompose(
+ AutoCloseableAsync::closeAsync);
+ FutureUtils.forward(jobManagerRunnerClose,
terminationFuture);
+ } else {
+ // initialization has failed: forward failure.
+
terminationFuture.completeExceptionally(throwable);
Review comment:
I might have given you bad advice here. Forwarding an initialization
problem which is not an abnormal condition for the `Dispatcher` might lead to a
failure reported during the shut down of the `Dispatcher`. I think it is fine
to say that a failed job manager initialization is an acceptable condition and,
hence, to simply complete the `terminationFuture` here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void
testJobSubmissionWithPartialResourceConfigured() throws Exception {
}
}
+ @Test(timeout = 5_000L)
+ public void testNonBlockingJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status from status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // ensure correct JobDetails
+ MultipleJobsDetails multiDetails =
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+ Assert.assertEquals(1, multiDetails.getJobs().size());
+ Assert.assertEquals(blockingJobGraph.getJobID(),
multiDetails.getJobs().iterator().next().getJobId());
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobVertex.unblock();
+
+ // wait till job is running
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+ dispatcherGateway.cancelJob(blockingJobGraph.getJobID(),
TIMEOUT).get();
+ }
+
+ @Test(timeout = 5_000L)
+ public void testInvalidCallDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
Review comment:
Duplicated code. I'd suggest to create a method which can be reused.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -835,51 +796,50 @@ private void jobMasterFailed(JobID jobId, Throwable
cause) {
}
@Nonnull
- private <T> List<CompletableFuture<Optional<T>>>
queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>>
queryFunction) {
- final int numberJobsRunning = jobManagerRunnerFutures.size();
+ private <T> List<CompletableFuture<Optional<T>>>
queryJobMastersForInformation(Function<DispatcherJob, CompletableFuture<T>>
queryFunction) {
ArrayList<CompletableFuture<Optional<T>>>
optionalJobInformation = new ArrayList<>(
- numberJobsRunning);
+ runningJobs.size());
- for (JobID jobId : jobManagerRunnerFutures.keySet()) {
- final CompletableFuture<JobMasterGateway>
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- final CompletableFuture<Optional<T>> optionalRequest =
jobMasterGatewayFuture
- .thenCompose(queryFunction::apply)
- .handle((T value, Throwable throwable) ->
Optional.ofNullable(value));
-
- optionalJobInformation.add(optionalRequest);
+ for (DispatcherJob job : runningJobs.values()) {
+ final CompletableFuture<Optional<T>> queryResult =
queryFunction.apply(job)
+ .handle((T value, Throwable t) ->
Optional.ofNullable(value));
+ optionalJobInformation.add(queryResult);
}
return optionalJobInformation;
}
- private CompletableFuture<Void> waitForTerminatingJobManager(JobID
jobId, JobGraph jobGraph, FunctionWithException<JobGraph,
CompletableFuture<Void>, ?> action) {
+ private CompletableFuture<Void> waitForTerminatingJobManager(JobID
jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture =
getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
- throw new CompletionException(
- new DispatcherException(
- String.format("Termination of
previous JobManager for job %s failed. Cannot submit job under the same job
id.", jobId),
- throwable)); });
-
- return jobManagerTerminationFuture.thenComposeAsync(
- FunctionUtils.uncheckedFunction((ignored) -> {
- jobManagerTerminationFutures.remove(jobId);
- return action.apply(jobGraph);
+ if (!ExceptionUtils.findThrowable(throwable,
JobInitializationException.class).isPresent()) {
+ throw new CompletionException(
+ new DispatcherException(
+
String.format("Termination of previous JobManager for job %s failed. Cannot
submit job under the same job id.", jobId),
+ throwable));
+ }
Review comment:
If we don't complete the `DispatcherJob.terminationFuture` with failures
from the `JobManagerRunner`, then we don't have do to this special casing here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ @Test
+ public void testStatusWhenInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+ }
+
+ @Test
+ public void testStatusWhenRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish initialization
+ testContext.setRunning();
+
+ assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+ // result future not done
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ }
+
+ @Test
+ public void testStatusWhenJobFinished() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish job
+ testContext.setRunning();
+ testContext.finishJob();
+
+ assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+ // assert result future done
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
+ Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(
+ TIMEOUT);
+
+ Assert.assertThat(cancelFuture.isDone(), is(false));
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+ testContext.setRunning();
+ testContext.finishCancellation();
+
+ // assert that cancel future completes
+ cancelFuture.get();
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ // assert that the result future completes
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(TIMEOUT);
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+ testContext.finishCancellation();
+
+ cancelFuture.get();
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileFailed() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ CommonTestUtils.assertThrows("Artificial failure",
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+ }
+
+ @Test
+ public void testErrorWhileInitializing() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+ // now fail
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
is(exception));
+
+ Assert.assertTrue(dispatcherJob.closeAsync().isDone() &&
dispatcherJob.closeAsync().isCompletedExceptionally());
Review comment:
I am not sure whether this should be the case.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void
testJobSubmissionWithPartialResourceConfigured() throws Exception {
}
}
+ @Test(timeout = 5_000L)
+ public void testNonBlockingJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status from status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // ensure correct JobDetails
+ MultipleJobsDetails multiDetails =
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+ Assert.assertEquals(1, multiDetails.getJobs().size());
+ Assert.assertEquals(blockingJobGraph.getJobID(),
multiDetails.getJobs().iterator().next().getJobId());
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobVertex.unblock();
+
+ // wait till job is running
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+ dispatcherGateway.cancelJob(blockingJobGraph.getJobID(),
TIMEOUT).get();
Review comment:
What does this call test?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ @Test
+ public void testStatusWhenInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+ }
+
+ @Test
+ public void testStatusWhenRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish initialization
+ testContext.setRunning();
+
+ assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+ // result future not done
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ }
+
+ @Test
+ public void testStatusWhenJobFinished() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish job
+ testContext.setRunning();
+ testContext.finishJob();
+
+ assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+ // assert result future done
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
+ Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(
+ TIMEOUT);
+
+ Assert.assertThat(cancelFuture.isDone(), is(false));
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+ testContext.setRunning();
+ testContext.finishCancellation();
+
+ // assert that cancel future completes
+ cancelFuture.get();
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ // assert that the result future completes
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(TIMEOUT);
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+ testContext.finishCancellation();
+
+ cancelFuture.get();
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileFailed() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ CommonTestUtils.assertThrows("Artificial failure",
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+ }
+
+ @Test
+ public void testErrorWhileInitializing() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+ // now fail
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
is(exception));
+
+ Assert.assertTrue(dispatcherJob.closeAsync().isDone() &&
dispatcherJob.closeAsync().isCompletedExceptionally());
+ }
+
+ @Test
+ public void testCloseWhileInitializingSuccessfully() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+ Assert.assertThat(closeFuture.isDone(), is(false));
+
+ // set job running, so that we can cancel it
+ testContext.setRunning();
+
+ // assert future completes now
+ closeFuture.get();
+
+ // ensure the result future is complete (how it completes is up
to the JobManager)
+ CompletableFuture<ArchivedExecutionGraph> resultFuture =
dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished",
ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test
+ public void testCloseWhileInitializingErroneously() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+ Assert.assertThat(closeFuture.isDone(), is(false));
+
+ testContext.failInitialization(new RuntimeException("fail"));
+
+ // assert future completes now
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ closeFuture::get);
+
+ // ensure the result future is complete
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.FAILED));
+ }
+
+ @Test
+ public void testCloseWhileInitializingErroneouslyForRecovery() {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.RECOVERY);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+
+ testContext.failInitialization(new RuntimeException("fail"));
+
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ closeFuture::get);
+ // ensure the result future is completing exceptionally when
using RECOVERY execution
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ () -> dispatcherJob.getResultFuture().get());
+ }
+
+ @Test
+ public void testCloseWhileRunning() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // complete JobManager runner future to indicate to the
DispatcherJob that the Runner has been initialized
+ testContext.setRunning();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+
+ closeFuture.get();
+
+ // result future should complete exceptionally.
+ CompletableFuture<ArchivedExecutionGraph> resultFuture =
dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished",
ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUnavailableJobMasterGateway() {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ dispatcherJob.getJobMasterGateway();
+ }
+
+ private TestContext createTestContext(Dispatcher.ExecutionType type) {
+ final JobVertex testVertex = new JobVertex("testVertex");
+ testVertex.setInvokableClass(NoOpInvokable.class);
+
+ JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob",
testVertex);
+ CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+ DispatcherJob dispatcherJob =
DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+ jobGraph.getJobID(), jobGraph.getName(), type);
+
+ return new TestContext(
+ jobManagerRunnerCompletableFuture,
+ dispatcherJob,
+ jobGraph);
+ }
+
+ private static class TestContext {
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture;
+ private final DispatcherJob dispatcherJob;
+ private final JobGraph jobGraph;
+ private final TestingJobMasterGateway
mockRunningJobMasterGateway;
+ private final CompletableFuture<Acknowledge> cancellationFuture;
+
+ private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+
+ public TestContext(
+ CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture,
+ DispatcherJob dispatcherJob,
+ JobGraph jobGraph) {
+ this.jobManagerRunnerCompletableFuture =
jobManagerRunnerCompletableFuture;
+ this.dispatcherJob = dispatcherJob;
+ this.jobGraph = jobGraph;
+
+ this.cancellationFuture = new CompletableFuture<>();
+ this.mockRunningJobMasterGateway = new
TestingJobMasterGatewayBuilder()
+ .setRequestJobSupplier(() ->
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(),
"test", internalJobStatus, null, 1337)))
+ .setRequestJobDetailsSupplier(() -> {
+ JobDetails jobDetails = new
JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+ new int[]{0, 0, 0, 0, 0, 0, 0,
0, 0}, 0);
+ return
CompletableFuture.completedFuture(jobDetails);
+ })
+ // once JobManagerRunner is initialized,
complete result future with CANCELLED AEG and ack cancellation.
+ .setCancelFunction(() -> {
+ internalJobStatus =
JobStatus.CANCELLING;
+ return cancellationFuture;
+ })
+ .build();
+ }
+
+ public JobID getJobID() {
+ return jobGraph.getJobID();
+ }
+
+ public void failInitialization(Throwable ex) {
+
jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+ }
+
+ public DispatcherJob getDispatcherJob() {
+ return dispatcherJob;
+ }
+
+ public void setRunning() {
+ internalJobStatus = JobStatus.RUNNING;
+ TestingJobManagerRunner jobManagerRunner =
+ new TestingJobManagerRunner(getJobID(), false);
+
jobManagerRunner.getJobMasterGateway().complete(mockRunningJobMasterGateway);
+
jobManagerRunnerCompletableFuture.complete(jobManagerRunner);
+ }
+
+ public void finishJob() {
+ try {
+ internalJobStatus = JobStatus.FINISHED;
+
jobManagerRunnerCompletableFuture.get().getResultFuture()
+
.complete(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test",
JobStatus.FINISHED, null, 1337));
Review comment:
Same hack as in `setRunning`. I'd suggest to pass in a `jobResultFuture
to the `JobManagerRunner` which is created to complete
`jobManagerRunnerCompletableFuture`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -419,19 +554,29 @@ public void testWaitingForJobMasterLeadership() throws
Exception {
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- final CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
assertThat(jobStatusFuture.isDone(), is(false));
Review comment:
Are you sure that this holds now? I thought it would return
`JobStatus.INITIALIZING` at least.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ @Test
+ public void testStatusWhenInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+ }
+
+ @Test
+ public void testStatusWhenRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish initialization
+ testContext.setRunning();
+
+ assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+ // result future not done
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ }
+
+ @Test
+ public void testStatusWhenJobFinished() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish job
+ testContext.setRunning();
+ testContext.finishJob();
+
+ assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+ // assert result future done
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
+ Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(
+ TIMEOUT);
+
+ Assert.assertThat(cancelFuture.isDone(), is(false));
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+ testContext.setRunning();
+ testContext.finishCancellation();
+
+ // assert that cancel future completes
+ cancelFuture.get();
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ // assert that the result future completes
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(TIMEOUT);
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+ testContext.finishCancellation();
+
+ cancelFuture.get();
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileFailed() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ CommonTestUtils.assertThrows("Artificial failure",
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+ }
+
+ @Test
+ public void testErrorWhileInitializing() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+ // now fail
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
is(exception));
+
+ Assert.assertTrue(dispatcherJob.closeAsync().isDone() &&
dispatcherJob.closeAsync().isCompletedExceptionally());
+ }
+
+ @Test
+ public void testCloseWhileInitializingSuccessfully() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+ Assert.assertThat(closeFuture.isDone(), is(false));
+
+ // set job running, so that we can cancel it
+ testContext.setRunning();
+
+ // assert future completes now
+ closeFuture.get();
+
+ // ensure the result future is complete (how it completes is up
to the JobManager)
+ CompletableFuture<ArchivedExecutionGraph> resultFuture =
dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished",
ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test
+ public void testCloseWhileInitializingErroneously() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+ Assert.assertThat(closeFuture.isDone(), is(false));
+
+ testContext.failInitialization(new RuntimeException("fail"));
+
+ // assert future completes now
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ closeFuture::get);
+
+ // ensure the result future is complete
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.FAILED));
+ }
+
+ @Test
+ public void testCloseWhileInitializingErroneouslyForRecovery() {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.RECOVERY);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+
+ testContext.failInitialization(new RuntimeException("fail"));
+
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ closeFuture::get);
+ // ensure the result future is completing exceptionally when
using RECOVERY execution
+ CommonTestUtils.assertThrows("fail", ExecutionException.class,
+ () -> dispatcherJob.getResultFuture().get());
+ }
+
+ @Test
+ public void testCloseWhileRunning() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // complete JobManager runner future to indicate to the
DispatcherJob that the Runner has been initialized
+ testContext.setRunning();
+
+ CompletableFuture<Void> closeFuture =
dispatcherJob.closeAsync();
+
+ closeFuture.get();
+
+ // result future should complete exceptionally.
+ CompletableFuture<ArchivedExecutionGraph> resultFuture =
dispatcherJob.getResultFuture();
+ CommonTestUtils.assertThrows("has not been finished",
ExecutionException.class,
+ resultFuture::get);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUnavailableJobMasterGateway() {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ dispatcherJob.getJobMasterGateway();
+ }
+
+ private TestContext createTestContext(Dispatcher.ExecutionType type) {
+ final JobVertex testVertex = new JobVertex("testVertex");
+ testVertex.setInvokableClass(NoOpInvokable.class);
+
+ JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob",
testVertex);
+ CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+ DispatcherJob dispatcherJob =
DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+ jobGraph.getJobID(), jobGraph.getName(), type);
+
+ return new TestContext(
+ jobManagerRunnerCompletableFuture,
+ dispatcherJob,
+ jobGraph);
+ }
+
+ private static class TestContext {
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture;
+ private final DispatcherJob dispatcherJob;
+ private final JobGraph jobGraph;
+ private final TestingJobMasterGateway
mockRunningJobMasterGateway;
+ private final CompletableFuture<Acknowledge> cancellationFuture;
+
+ private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+
+ public TestContext(
+ CompletableFuture<JobManagerRunner>
jobManagerRunnerCompletableFuture,
+ DispatcherJob dispatcherJob,
+ JobGraph jobGraph) {
+ this.jobManagerRunnerCompletableFuture =
jobManagerRunnerCompletableFuture;
+ this.dispatcherJob = dispatcherJob;
+ this.jobGraph = jobGraph;
+
+ this.cancellationFuture = new CompletableFuture<>();
+ this.mockRunningJobMasterGateway = new
TestingJobMasterGatewayBuilder()
+ .setRequestJobSupplier(() ->
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(),
"test", internalJobStatus, null, 1337)))
+ .setRequestJobDetailsSupplier(() -> {
+ JobDetails jobDetails = new
JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+ new int[]{0, 0, 0, 0, 0, 0, 0,
0, 0}, 0);
+ return
CompletableFuture.completedFuture(jobDetails);
+ })
+ // once JobManagerRunner is initialized,
complete result future with CANCELLED AEG and ack cancellation.
+ .setCancelFunction(() -> {
+ internalJobStatus =
JobStatus.CANCELLING;
+ return cancellationFuture;
+ })
+ .build();
+ }
+
+ public JobID getJobID() {
+ return jobGraph.getJobID();
+ }
+
+ public void failInitialization(Throwable ex) {
+
jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+ }
+
+ public DispatcherJob getDispatcherJob() {
+ return dispatcherJob;
+ }
+
+ public void setRunning() {
+ internalJobStatus = JobStatus.RUNNING;
+ TestingJobManagerRunner jobManagerRunner =
+ new TestingJobManagerRunner(getJobID(), false);
+
jobManagerRunner.getJobMasterGateway().complete(mockRunningJobMasterGateway);
Review comment:
This is a hack. I would suggest to add a
`TestingJobManagerRunnerBuilder` where one can set the result of the
`getJobMasterGateway()` function.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void
testJobSubmissionWithPartialResourceConfigured() throws Exception {
}
}
+ @Test(timeout = 5_000L)
+ public void testNonBlockingJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status from status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // ensure correct JobDetails
+ MultipleJobsDetails multiDetails =
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+ Assert.assertEquals(1, multiDetails.getJobs().size());
+ Assert.assertEquals(blockingJobGraph.getJobID(),
multiDetails.getJobs().iterator().next().getJobId());
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobVertex.unblock();
+
+ // wait till job is running
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+ dispatcherGateway.cancelJob(blockingJobGraph.getJobID(),
TIMEOUT).get();
+ }
+
+ @Test(timeout = 5_000L)
+ public void testInvalidCallDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // this call is supposed to fail
+ boolean exceptionSeen = false;
+ try {
+
dispatcherGateway.triggerSavepoint(blockingJobGraph.getJobID(),
"file:///tmp/savepoint", false, TIMEOUT).get();
+ } catch (ExecutionException t) {
+ Assert.assertTrue(t.getCause() instanceof
UnavailableDispatcherOperationException);
+ exceptionSeen = true;
+ }
+ Assert.assertTrue(exceptionSeen);
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobVertex.unblock();
+ }
+
+ @Test(timeout = 5_000L)
+ public void testCancellationDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
Review comment:
Duplicated code.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -285,12 +299,14 @@ public void tearDown() throws Exception {
@Test
public void testJobSubmission() throws Exception {
dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
-
+ // grant leadership to job master so that we can call
dispatcherGateway.requestJobStatus().
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- CompletableFuture<Acknowledge> acknowledgeFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- acknowledgeFuture.get();
+ CommonTestUtils.waitUntilCondition(() ->
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() ==
JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)),
20L);
Review comment:
Why don't we wait on `jobMasterLeaderElectionService.getStartFuture`
instead of doing busy polling?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -603,6 +769,9 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles()
throws Exception {
.build();
dispatcher.start();
+ // we need to elect a jobmaster leader to be able to cancel the
job on the JobMaster.
+
jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
Review comment:
Why is this needed? Why do we need to cancel the job here?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -561,10 +725,12 @@ public void
testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
@Test
public void testJobSuspensionWhenDispatcherIsTerminated() throws
Exception {
dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get());
Review comment:
Why do we have to wait on this here?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -509,28 +657,41 @@ public void testFailingJobManagerRunnerCleanup() throws
Exception {
if (exception != null) {
throw exception;
}
- }));
+ }, getMockedRunningJobMasterGateway()));
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- CompletableFuture<Acknowledge> submissionFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- assertThat(submissionFuture.isDone(), is(false));
+
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(),
TIMEOUT).get(), is(JobStatus.INITIALIZING));
queue.offer(Optional.of(testException));
- try {
- submissionFuture.get();
- fail("Should fail because we could not instantiate the
JobManagerRunner.");
- } catch (Exception e) {
- assertThat(ExceptionUtils.findThrowable(e, t ->
t.equals(testException)).isPresent(), is(true));
- }
-
- submissionFuture = dispatcherGateway.submitJob(jobGraph,
TIMEOUT);
+ // wait till job is failed
+ JobStatus status;
+ do {
+ status = dispatcherGateway.requestJobStatus(
+ jobGraph.getJobID(),
+ TIMEOUT).get();
+ Assert.assertThat(
+ status,
+
either(Matchers.is(JobStatus.INITIALIZING)).or(Matchers.is(JobStatus.FAILED)));
+ Thread.sleep(20);
+ } while (status != JobStatus.FAILED);
Review comment:
Waiting on `dispatcherGateway.requestJobResult` could save use the busy
waiting loop.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
##########
@@ -181,17 +186,21 @@ private void testBlobServerCleanup(final TestCase
testCase) throws Exception {
}
final CompletableFuture<JobSubmissionResult> submissionFuture =
miniCluster.submitJob(jobGraph);
+ final JobSubmissionResult jobSubmissionResult =
submissionFuture.get();
+
+ // wait until job is submitted
+ CommonTestUtils.waitUntilCondition(() ->
miniCluster.getJobStatus(jobGraph.getJobID()).get() != JobStatus.INITIALIZING,
+ Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)),
20L);
Review comment:
Why do we have to wait on this?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -617,16 +786,23 @@ public void testOnRemovedJobGraphDoesNotCleanUpHAFiles()
throws Exception {
@Nonnull
private final ThrowingRunnable<Exception>
jobManagerRunnerCreationLatch;
+ @Nullable
+ private final JobMasterGateway jobMasterGateway;
- BlockingJobManagerRunnerFactory(@Nonnull
ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+ BlockingJobManagerRunnerFactory(@Nonnull
ThrowingRunnable<Exception> jobManagerRunnerCreationLatch, @Nullable
JobMasterGateway jobMasterGateway) {
this.jobManagerRunnerCreationLatch =
jobManagerRunnerCreationLatch;
+ this.jobMasterGateway = jobMasterGateway;
}
@Override
public TestingJobManagerRunner createJobManagerRunner(JobGraph
jobGraph, Configuration configuration, RpcService rpcService,
HighAvailabilityServices highAvailabilityServices, HeartbeatServices
heartbeatServices, JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {
jobManagerRunnerCreationLatch.run();
- return super.createJobManagerRunner(jobGraph,
configuration, rpcService, highAvailabilityServices, heartbeatServices,
jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+ TestingJobManagerRunner runner =
super.createJobManagerRunner(jobGraph, configuration, rpcService,
highAvailabilityServices, heartbeatServices, jobManagerSharedServices,
jobManagerJobMetricGroupFactory, fatalErrorHandler);
+ if (jobMasterGateway != null) {
+
runner.getJobMasterGateway().complete(jobMasterGateway);
+ }
Review comment:
Let's not use this hack here. We don't know whether
`getJobMasterGateway` will always return the same future.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -690,4 +866,66 @@ public JobManagerRunner createJobManagerRunner(
}
}
+ private static class BlockingJobVertex extends JobVertex {
+ private final Object lock = new Object();
+ private boolean blocking = true;
+ public BlockingJobVertex(String name) {
+ super(name);
+ }
+
+ @Override
+ public void initializeOnMaster(ClassLoader loader) throws
Exception {
+ super.initializeOnMaster(loader);
+
+ while (true) {
+ synchronized (lock) {
+ if (!blocking) {
+ return;
+ }
+ lock.wait(10);
+ }
+ }
Review comment:
I'd suggest to use a `OneShotLatch` here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -472,23 +618,25 @@ public void testBlockingJobManagerRunner() throws
Exception {
dispatcher = createAndStartDispatcher(
heartbeatServices,
haServices,
- new
BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await));
-
+ new
BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await,
getMockedRunningJobMasterGateway()));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- final CompletableFuture<Acknowledge> submissionFuture =
dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- assertThat(submissionFuture.isDone(), is(false));
+
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(),
TIMEOUT).get(), is(JobStatus.INITIALIZING));
final CompletableFuture<Collection<String>>
metricQueryServiceAddressesFuture =
dispatcherGateway.requestMetricQueryServiceAddresses(Time.seconds(5L));
assertThat(metricQueryServiceAddressesFuture.get(),
is(empty()));
- assertThat(submissionFuture.isDone(), is(false));
+
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(),
TIMEOUT).get(), is(JobStatus.INITIALIZING));
jobManagerRunnerCreationLatch.trigger();
- submissionFuture.get();
+ CommonTestUtils.waitUntilCondition(() ->
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() ==
JobStatus.RUNNING,
+ Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)),
5L);
+
assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(),
TIMEOUT).get(), is(JobStatus.RUNNING));
Review comment:
That's a bit doppelt gemoppelt.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -419,19 +554,29 @@ public void testWaitingForJobMasterLeadership() throws
Exception {
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
- final CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
assertThat(jobStatusFuture.isDone(), is(false));
- try {
- jobStatusFuture.get(10, TimeUnit.MILLISECONDS);
- fail("Should not complete.");
- } catch (TimeoutException ignored) {
- // ignored
+ // Make sure that the jobstatus request is blocking after it
has left the INITIALIZING status
+ boolean timeoutSeen = false;
+ while (!timeoutSeen) {
+ try {
+ jobStatusFuture =
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+ JobStatus jobStatus = jobStatusFuture.get(10,
TimeUnit.MILLISECONDS);
+ if (jobStatus != JobStatus.INITIALIZING) {
+ fail("Should not complete.");
+ } else {
+ Thread.sleep(10); // give more time to
initialize
+ }
+ } catch (TimeoutException ignored) {
+ timeoutSeen = true;
+ }
}
-
+ // Job is initialized. Make the master leader
jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
+ // ensure that the future is now completing
Review comment:
Maybe it would be better to test the completion of the leader future
directly on the `JobManagerRunnerImpl`.
I have to admit that I am not a huge fan of busy waiting loops. Whenever one
sees one, one should think whether one can't express the test differently.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
##########
@@ -166,21 +167,28 @@ public void testResourceCleanupUnderLeadershipChange()
throws Exception {
defaultDispatcherRunnerFactory)) {
// initial run
- DispatcherGateway dispatcherGateway =
grantLeadership(dispatcherLeaderElectionService);
+ final DispatcherGateway dispatcherGateway =
grantLeadership(dispatcherLeaderElectionService);
LOG.info("Initial job submission {}.",
jobGraph.getJobID());
dispatcherGateway.submitJob(jobGraph,
TESTING_TIMEOUT).get();
+ // wait until job is running
+ CommonTestUtils.waitUntilCondition(() ->
+
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TESTING_TIMEOUT).get()
== JobStatus.RUNNING, Deadline.fromNow(VERIFICATION_TIMEOUT), 20L);
Review comment:
Why is this required here?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -329,6 +345,125 @@ public void
testJobSubmissionWithPartialResourceConfigured() throws Exception {
}
}
+ @Test(timeout = 5_000L)
+ public void testNonBlockingJobSubmission() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status from status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // ensure correct JobDetails
+ MultipleJobsDetails multiDetails =
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+ Assert.assertEquals(1, multiDetails.getJobs().size());
+ Assert.assertEquals(blockingJobGraph.getJobID(),
multiDetails.getJobs().iterator().next().getJobId());
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobVertex.unblock();
+
+ // wait till job is running
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT).get());
+
+ dispatcherGateway.cancelJob(blockingJobGraph.getJobID(),
TIMEOUT).get();
+ }
+
+ @Test(timeout = 5_000L)
+ public void testInvalidCallDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // ensure INITIALIZING status
+ CompletableFuture<JobStatus> jobStatusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ Assert.assertEquals(JobStatus.INITIALIZING,
jobStatusFuture.get());
+
+ // this call is supposed to fail
+ boolean exceptionSeen = false;
+ try {
+
dispatcherGateway.triggerSavepoint(blockingJobGraph.getJobID(),
"file:///tmp/savepoint", false, TIMEOUT).get();
+ } catch (ExecutionException t) {
+ Assert.assertTrue(t.getCause() instanceof
UnavailableDispatcherOperationException);
+ exceptionSeen = true;
+ }
+ Assert.assertTrue(exceptionSeen);
+
+ // submission has succeeded, let the initialization finish.
+ blockingJobVertex.unblock();
+ }
+
+ @Test(timeout = 5_000L)
+ public void testCancellationDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph of a job that blocks forever
+ final BlockingJobVertex blockingJobVertex = new
BlockingJobVertex("testVertex");
+ blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"blockingTestJob", blockingJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+
Assert.assertThat(dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(),
TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+ // submission has succeeded, now cancel the job
+ CompletableFuture<Acknowledge> cancellationFuture =
dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), TIMEOUT);
+
Assert.assertThat(dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(),
TIMEOUT).get(), is(JobStatus.CANCELLING));
+ Assert.assertThat(cancellationFuture.isDone(), is(false));
+ // unblock
+ blockingJobVertex.unblock();
+ // wait until cancelled
+ cancellationFuture.get();
+
Assert.assertThat(dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(),
TIMEOUT).get(), is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testErrorDuringInitialization() throws Exception {
+ dispatcher = createAndStartDispatcher(heartbeatServices,
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID,
createdJobManagerRunnerLatch));
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // create a job graph that fails during initialization
+ final FailingInitializationJobVertex
failingInitializationJobVertex = new FailingInitializationJobVertex(
+ "testVertex");
+
failingInitializationJobVertex.setInvokableClass(NoOpInvokable.class);
+ JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID,
"failingTestJob", failingInitializationJobVertex);
+
+ dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+ // wait till job has failed
+ JobStatus status;
+ do {
+ CompletableFuture<JobStatus> statusFuture =
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT);
+ status = statusFuture.get();
+ Thread.sleep(50);
+ Assert.assertThat(status,
either(is(JobStatus.INITIALIZING)).or(is(JobStatus.FAILED)));
+ } while (status != JobStatus.FAILED);
Review comment:
Can't we replace this with `dispatcherGateway.requestJobResult` and then
waiting on the returned future?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java
##########
@@ -134,14 +135,13 @@ public void
leaderChange_afterJobSubmission_recoversSubmittedJob() throws Except
final UUID firstLeaderSessionId = UUID.randomUUID();
final DispatcherGateway firstDispatcherGateway =
electLeaderAndRetrieveGateway(firstLeaderSessionId);
-
firstDispatcherGateway.submitJob(jobGraph,
TIMEOUT).get();
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
firstDispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get());
Review comment:
Why is this needed?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
##########
@@ -103,6 +104,8 @@ public void testReelectionOfDispatcher() throws Exception {
submissionFuture.get();
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
miniCluster.getJobStatus(jobId).get());
Review comment:
Why is this required?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static junit.framework.Assert.fail;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ @Test
+ public void testStatusWhenInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+ }
+
+ @Test
+ public void testStatusWhenRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish initialization
+ testContext.setRunning();
+
+ assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+ // result future not done
+ Assert.assertThat(dispatcherJob.getResultFuture().isDone(),
is(false));
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ }
+
+ @Test
+ public void testStatusWhenJobFinished() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ // finish job
+ testContext.setRunning();
+ testContext.finishJob();
+
+ assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+ // assert result future done
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
+ Assert.assertThat(aeg.getState(), is(JobStatus.FINISHED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileInitializing() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+ assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(
+ TIMEOUT);
+
+ Assert.assertThat(cancelFuture.isDone(), is(false));
+ Assert.assertThat(dispatcherJob.isInitialized(), is(false));
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+ testContext.setRunning();
+ testContext.finishCancellation();
+
+ // assert that cancel future completes
+ cancelFuture.get();
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ // assert that the result future completes
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileRunning() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+ CompletableFuture<Acknowledge> cancelFuture =
dispatcherJob.cancel(TIMEOUT);
+
+ assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+ testContext.finishCancellation();
+
+ cancelFuture.get();
+ assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+
Assert.assertThat(dispatcherJob.getResultFuture().get().getState(),
is(JobStatus.CANCELED));
+ }
+
+ @Test
+ public void testStatusWhenCancellingWhileFailed() throws
+ Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ CommonTestUtils.assertThrows("Artificial failure",
ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+ }
+
+ @Test
+ public void testErrorWhileInitializing() throws Exception {
+ TestContext testContext =
createTestContext(Dispatcher.ExecutionType.SUBMISSION);
+ DispatcherJob dispatcherJob = testContext.dispatcherJob;
+
+ // now fail
+ RuntimeException exception = new RuntimeException("Artificial
failure in runner initialization");
+ testContext.failInitialization(exception);
+
+ Assert.assertThat(dispatcherJob.isInitialized(), is(true));
+ assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+ ArchivedExecutionGraph aeg =
dispatcherJob.getResultFuture().get();
+
Assert.assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()),
is(exception));
+
+ Assert.assertTrue(dispatcherJob.closeAsync().isDone() &&
dispatcherJob.closeAsync().isCompletedExceptionally());
Review comment:
This condition is actually covered by
`testCloseWhileInitializingErroneously`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,53 @@ public static ArchivedExecutionGraph
createFrom(ExecutionGraph executionGraph) {
executionGraph.getCheckpointStatsSnapshot(),
executionGraph.getStateBackendName().orElse(null));
}
+
+ /**
+ * Create a sparse ArchivedExecutionGraph for a job while it is still
initializing.
+ * Most fields will be empty, only job status and error-related fields
are set.
+ */
+ public static ArchivedExecutionGraph createFromInitializingJob(
+ JobID jobId,
+ String jobName,
+ JobStatus jobStatus,
+ @Nullable Throwable throwable,
+ long initializationTimestamp) {
+
+ long failureTime = System.currentTimeMillis();
Review comment:
It does not matter if this will cost us 5ms.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
##########
@@ -135,6 +138,8 @@ public void testReelectionOfJobMaster() throws Exception {
submissionFuture.get();
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
miniCluster.getJobStatus(jobId).get());
Review comment:
Same here?
##########
File path:
flink-table/flink-table-planner-blink/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = DEBUG
Review comment:
Please revert
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
##########
@@ -132,6 +132,7 @@ public void testJobRecoveryWithFailingTaskExecutor() throws
Exception {
private CompletableFuture<JobResult>
submitJobAndWaitUntilRunning(JobGraph jobGraph) throws Exception {
miniCluster.submitJob(jobGraph).get();
+ CommonTestUtils.waitUntilJobManagerIsInitialized(() ->
miniCluster.getJobStatus(jobGraph.getJobID()).get());
Review comment:
Why is this needed here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]